diff --git a/rrompy/hfengines/base/matrix_engine_base.py b/rrompy/hfengines/base/matrix_engine_base.py index fa9e3b2..c6c63fe 100644 --- a/rrompy/hfengines/base/matrix_engine_base.py +++ b/rrompy/hfengines/base/matrix_engine_base.py @@ -1,479 +1,479 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from abc import abstractmethod import numpy as np import scipy.sparse as scsp from matplotlib import pyplot as plt from copy import deepcopy as copy, copy as softcopy from rrompy.utilities.base.types import (Np1D, Np2D, ScOp, strLst, TupleAny, List, ListAny, DictAny, paramVal, paramList, sampList) from rrompy.utilities.base import (purgeList, getNewFilename, verbosityManager as vbMng) from rrompy.utilities.expression import (expressionEvaluator, createMonomial, createMonomialList) from rrompy.utilities.numerical import (hashDerivativeToIdx as hashD, solve, dot) from rrompy.utilities.exception_manager import RROMPyException, RROMPyAssert from rrompy.parameter import checkParameter, checkParameterList from rrompy.sampling import sampleList, emptySampleList from rrompy.solver import setupSolver, Np2DLikeEye __all__ = ['MatrixEngineBase'] class MatrixEngineBase: """ Generic solver for parametric matrix problems. Attributes: verbosity: Verbosity level. As: Scipy sparse array representation (in CSC format) of As. bs: Numpy array representation of bs. energyNormMatrix: Scipy sparse matrix representing inner product. energyNormDualMatrix: Scipy sparse matrix representing dual inner product. dualityMatrix: Scipy sparse matrix representing duality. energyNormPartialDualMatrix: Scipy sparse matrix representing dual inner product without duality. """ def __init__(self, verbosity : int = 10, timestamp : bool = True): self.verbosity = verbosity self.timestamp = timestamp self._affinePoly = True self.nAs, self.nbs = 1, 1 self.setSolver("SPSOLVE", {"use_umfpack" : False}) self.npar = 0 def name(self) -> str: return self.__class__.__name__ def __str__(self) -> str: return self.name() def __repr__(self) -> str: return self.__str__() + " at " + hex(id(self)) def __dir_base__(self): return [x for x in self.__dir__() if x[:2] != "__"] def __deepcopy__(self, memo): return softcopy(self) @property def npar(self): """Value of npar.""" return self._npar @npar.setter def npar(self, npar): nparOld = self._npar if hasattr(self, "_npar") else -1 if npar != nparOld: self.rescalingExp = [1.] * npar self._npar = npar @property def nAs(self): """Value of nAs.""" return self._nAs @nAs.setter def nAs(self, nAs): self._nAs = nAs self.resetAs() @property def nbs(self): """Value of nbs.""" return self._nbs @nbs.setter def nbs(self, nbs): self._nbs = nbs self.resetbs() @property def affinePoly(self): return self._affinePoly def spacedim(self): return self.As[0].shape[1] def checkParameter(self, mu:paramVal): return checkParameter(mu, self.npar) def checkParameterList(self, mu:paramList): return checkParameterList(mu, self.npar) def buildEnergyNormForm(self): # eye """ Build sparse matrix (in CSR format) representative of scalar product. """ vbMng(self, "INIT", "Assembling energy matrix.", 20) self.energyNormMatrix = Np2DLikeEye(self.spacedim()) vbMng(self, "DEL", "Done assembling energy matrix.", 20) def buildEnergyNormDualForm(self): """ Build sparse matrix (in CSR format) representative of dual scalar product. """ if not hasattr(self, "energyNormMatrix"): self.buildEnergyNormForm() vbMng(self, "INIT", "Assembling energy dual matrix.", 20) self.energyNormDualMatrix = self.energyNormMatrix vbMng(self, "DEL", "Done assembling energy dual matrix.", 20) def buildDualityPairingForm(self): """Build sparse matrix (in CSR format) representative of duality.""" if not hasattr(self, "energyNormMatrix"): self.buildEnergyNormForm() vbMng(self, "INIT", "Assembling duality matrix.", 20) self.dualityMatrix = self.energyNormMatrix vbMng(self, "DEL", "Done assembling duality matrix.", 20) def buildEnergyNormPartialDualForm(self): """ Build sparse matrix (in CSR format) representative of dual scalar product without duality. """ if not hasattr(self, "energyNormDualMatrix"): self.buildEnergyNormDualForm() vbMng(self, "INIT", "Assembling energy partial dual matrix.", 20) self.energyNormPartialDualMatrix = self.energyNormDualMatrix vbMng(self, "DEL", "Done assembling energy partial dual matrix.", 20) def innerProduct(self, u:Np2D, v:Np2D, onlyDiag : bool = False, dual : bool = False, duality : bool = True) -> Np2D: """Scalar product.""" if dual: if duality: if not hasattr(self, "energyNormDualMatrix"): self.buildEnergyNormDualForm() energyMat = self.energyNormDualMatrix else: if not hasattr(self, "energyNormPartialDualMatrix"): self.buildEnergyNormPartialDualForm() energyMat = self.energyNormPartialDualMatrix else: if not hasattr(self, "energyNormMatrix"): self.buildEnergyNormForm() energyMat = self.energyNormMatrix if not isinstance(u, (np.ndarray,)): u = u.data if not isinstance(v, (np.ndarray,)): v = v.data if onlyDiag: return np.sum(dot(energyMat, u) * v.conj(), axis = 0) - return dot(dot(energyMat, u).T, v.conj()) + return dot(dot(energyMat, u).T, v.conj()).T def norm(self, u:Np2D, dual : bool = False, duality : bool = True) -> Np1D: return np.abs(self.innerProduct(u, u, onlyDiag = True, dual = dual, duality = duality)) ** .5 def checkAInBounds(self, derI : int = 0): """Check if derivative index is oob for operator of linear system.""" if derI < 0: d = self.spacedim() return scsp.csr_matrix((np.zeros(0), np.zeros(0), np.zeros(d + 1)), shape = (d, d), dtype = np.complex) def checkbInBounds(self, derI : int = 0): """Check if derivative index is oob for RHS of linear system.""" if derI < 0: return np.zeros(self.spacedim(), dtype = np.complex) def resetAs(self): """Reset (derivatives of) operator of linear system.""" self.setAs([None] * self.nAs) self.setthAs([None] * self.nAs) def resetbs(self): """Reset (derivatives of) RHS of linear system.""" self.setbs([None] * self.nbs) self.setthbs([None] * self.nbs) def getMonomialSingleWeight(self, deg:List[int]): return createMonomial(deg, True) def getMonomialWeights(self, n:int): return createMonomialList(n, self.npar, True) def setAs(self, As:List[Np2D]): """Assign terms of operator of linear system.""" if len(As) != self.nAs: raise RROMPyException(("Expected number {} of terms of As not " "matching given list length {}.").format(self.nAs, len(As))) self.As = [copy(A) for A in As] def setthAs(self, thAs:List[List[TupleAny]]): """Assign terms of operator of linear system.""" if len(thAs) != self.nAs: raise RROMPyException(("Expected number {} of terms of thAs not " "matching given list length {}.").format(self.nAs, len(thAs))) self.thAs = copy(thAs) def setbs(self, bs:List[Np1D]): """Assign terms of RHS of linear system.""" if len(bs) != self.nbs: raise RROMPyException(("Expected number {} of terms of bs not " "matching given list length {}.").format(self.nbs, len(bs))) self.bs = [copy(b) for b in bs] def setthbs(self, thbs:List[List[TupleAny]]): """Assign terms of RHS of linear system.""" if len(thbs) != self.nbs: raise RROMPyException(("Expected number {} of terms of thbs not " "matching given list length {}.").format(self.nbs, len(thbs))) self.thbs = copy(thbs) def _assembleObject(self, mu:paramVal, objs:ListAny, th:ListAny, derI:int) -> ScOp: """Assemble (derivative of) object from list of derivatives.""" mu = self.checkParameter(mu) rExp = self.rescalingExp muE = mu ** rExp obj = None for j in range(len(objs)): if len(th[j]) <= derI and th[j][-1] is not None: raise RROMPyException(("Cannot assemble operator. Non enough " "derivatives of theta provided.")) if len(th[j]) > derI and th[j][derI] is not None: expr = expressionEvaluator(th[j][derI], muE) if hasattr(expr, "__len__"): if len(expr) > 1: raise RROMPyException(("Size mismatch in value of " "theta function. Only scalars " "allowed.")) expr = expr[0] if obj is None: obj = expr * objs[j] else: obj = obj + expr * objs[j] return obj @abstractmethod def buildA(self): """Build terms of operator of linear system.""" if self.thAs[0] is None: self.thAs = self.getMonomialWeights(self.nAs) for j in range(self.nAs): if self.As[j] is None: self.As[j] = self.checkAInBounds(-1) def A(self, mu : paramVal = [], der : List[int] = 0) -> ScOp: """ Assemble terms of operator of linear system and return it (or its derivative) at a given parameter. """ derI = hashD(der) if hasattr(der, "__len__") else der Anull = self.checkAInBounds(derI) if Anull is not None: return Anull self.buildA() assembledA = self._assembleObject(mu, self.As, self.thAs, derI) if assembledA is None: return self.checkAInBounds(-1) return assembledA @abstractmethod def buildb(self): """Build terms of operator of linear system.""" if self.thbs[0] is None: self.thbs = self.getMonomialWeights(self.nbs) for j in range(self.nbs): if self.bs[j] is None: self.bs[j] = self.checkbInBounds(-1) def b(self, mu : paramVal = [], der : List[int] = 0) -> Np1D: """ Assemble terms of RHS of linear system and return it (or its derivative) at a given parameter. """ derI = hashD(der) if hasattr(der, "__len__") else der bnull = self.checkbInBounds(derI) if bnull is not None: return bnull self.buildb() assembledb = self._assembleObject(mu, self.bs, self.thbs, derI) if assembledb is None: return self.checkbInBounds(-1) return assembledb def setSolver(self, solverType:str, solverArgs : DictAny = {}): """Choose solver type and parameters.""" self._solver, self._solverArgs = setupSolver(solverType, solverArgs) def solve(self, mu : paramList = [], RHS : sampList = None) -> sampList: """ Find solution of linear system. Args: mu: parameter value. RHS: RHS of linear system. If None, defaults to that of parametric system. Defaults to None. """ mu = self.checkParameterList(mu)[0] if mu.shape[0] == 0: mu.reset((1, self.npar), mu.dtype) sol = emptySampleList() if len(mu) > 0: if RHS is None: RHS = [self.b(m) for m in mu] RHS = sampleList(RHS) mult = 0 if len(RHS) == 1 else 1 RROMPyAssert(mult * (len(mu) - 1) + 1, len(RHS), "Sample size") for j in range(len(mu)): u = solve(self.A(mu[j]), RHS[mult * j], self._solver, self._solverArgs) if j == 0: sol.reset((len(u), len(mu)), dtype = u.dtype) sol[j] = u return sol def residual(self, mu : paramList = [], u : sampList = None, duality : bool = True) -> sampList: """ Find residual of linear system for given approximate solution. Args: mu: parameter value. u: numpy complex array with function dofs. If None, set to 0. """ mu = self.checkParameterList(mu)[0] if mu.shape[0] == 0: mu.reset((1, self.npar), mu.dtype) if u is not None: u = sampleList(u) mult = 0 if len(u) == 1 else 1 RROMPyAssert(mult * (len(mu) - 1) + 1, len(u), "Sample size") res = emptySampleList() if duality and not hasattr(self, "dualityMatrix"): self.buildDualityPairingForm() for j in range(len(mu)): r = self.b(mu[j]) if u is not None: r = r - dot(self.A(mu[j]), u[mult * j]) if j == 0: res.reset((len(r), len(mu)), dtype = r.dtype) if duality: r = dot(self.dualityMatrix, r) res[j] = r return res def _rayleighQuotient(self, A:Np2D, v0:Np1D, M:Np2D, sigma : float = 0., nIterP : int = 10, nIterR : int = 10) -> float: nIterP = min(nIterP, len(v0) // 2) nIterR = min(nIterR, (len(v0) + 1) // 2) v0 /= dot(dot(M, v0).T, v0.conj()) ** .5 for j in range(nIterP): v0 = solve(A - sigma * M, dot(M, v0), self._solver, self._solverArgs) v0 /= dot(dot(M, v0).T, v0.conj()) ** .5 l0 = dot(A.dot(v0).T, v0.conj()) for j in range(nIterR): v0 = solve(A - l0 * M, dot(M, v0), self._solver, self._solverArgs) v0 /= dot(dot(M, v0).T, v0.conj()) ** .5 l0 = dot(A.dot(v0).T, v0.conj()) if np.isnan(l0): l0 = np.finfo(float).eps return np.abs(l0) def stabilityFactor(self, mu : paramList = [], u : sampList = None, nIterP : int = 10, nIterR : int = 10) -> sampList: """ Find stability factor of matrix of linear system using iterative inverse power iteration- and Rayleigh quotient-based procedure. Args: mu: parameter values. u: numpy complex arrays with function dofs. nIterP: number of iterations of power method. nIterR: number of iterations of Rayleigh quotient method. """ mu = self.checkParameterList(mu)[0] if mu.shape[0] == 0: mu.reset((1, self.npar), mu.dtype) u = sampleList(u) mult = 0 if len(u) == 1 else 1 RROMPyAssert(mult * (len(mu) - 1) + 1, len(u), "Sample size") stabFact = np.empty(len(mu), dtype = float) if not hasattr(self, "energyNormMatrix"): self.buildEnergyNormForm() for j in range(len(mu)): stabFact[j] = self._rayleighQuotient(self.A(mu[j]), u[mult * j], self.energyNormMatrix, 0., nIterP, nIterR) return stabFact def plot(self, u:Np1D, warping : List[callable] = None, name : str = "u", save : str = None, what : strLst = 'all', saveFormat : str = "eps", saveDPI : int = 100, show : bool = True, pyplotArgs : dict = {}, **figspecs): """ Do some nice plots of the complex-valued function with given dofs. Args: u: numpy complex array with function dofs. name(optional): Name to be shown as title of the plots. Defaults to 'u'. save(optional): Where to save plot(s). Defaults to None, i.e. no saving. what(optional): Which plots to do. If list, can contain 'ABS', 'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard 'ALL'. Defaults to 'ALL'. saveFormat(optional): Format for saved plot(s). Defaults to "eps". saveDPI(optional): DPI for saved plot(s). Defaults to 100. show(optional): Whether to show figure. Defaults to True. pyplotArgs(optional): Optional arguments for pyplot. figspecs(optional key args): Optional arguments for matplotlib figure creation. """ if isinstance(what, (str,)): if what.upper() == 'ALL': what = ['ABS', 'PHASE', 'REAL', 'IMAG'] else: what = [what] what = purgeList(what, ['ABS', 'PHASE', 'REAL', 'IMAG'], listname = self.name() + ".what", baselevel = 1) if len(what) == 0: return if 'figsize' not in figspecs.keys(): figspecs['figsize'] = (13. * len(what) / 4, 3) subplotcode = 100 + len(what) * 10 idxs = np.arange(self.spacedim()) if warping is not None: idxs = warping[0](np.arange(self.spacedim())) plt.figure(**figspecs) plt.jet() if 'ABS' in what: subplotcode = subplotcode + 1 plt.subplot(subplotcode) plt.plot(idxs, np.abs(u).flatten(), **pyplotArgs) plt.title("|{0}|".format(name)) if 'PHASE' in what: subplotcode = subplotcode + 1 plt.subplot(subplotcode) plt.plot(idxs, np.angle(u).flatten(), **pyplotArgs) plt.title("phase({0})".format(name)) if 'REAL' in what: subplotcode = subplotcode + 1 plt.subplot(subplotcode) plt.plot(idxs, np.real(u).flatten(), **pyplotArgs) plt.title("Re({0})".format(name)) if 'IMAG' in what: subplotcode = subplotcode + 1 plt.subplot(subplotcode) plt.plot(idxs, np.imag(u).flatten(), **pyplotArgs) plt.title("Im({0})".format(name)) if save is not None: save = save.strip() plt.savefig(getNewFilename("{}_fig_".format(save), saveFormat), format = saveFormat, dpi = saveDPI) if show: plt.show() plt.close() diff --git a/rrompy/hfengines/base/problem_engine_base.py b/rrompy/hfengines/base/problem_engine_base.py index c419742..bb22b19 100644 --- a/rrompy/hfengines/base/problem_engine_base.py +++ b/rrompy/hfengines/base/problem_engine_base.py @@ -1,409 +1,410 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from os import path, mkdir import fenics as fen import numpy as np from matplotlib import pyplot as plt from rrompy.utilities.base.types import Np1D, strLst, FenFunc, Tuple, List from rrompy.utilities.base import (purgeList, getNewFilename, verbosityManager as vbMng) +from rrompy.utilities.numerical import dot from rrompy.solver import Np2DLikeEye from rrompy.solver.fenics import L2NormMatrix, fenplot, interp_project from .boundary_conditions import BoundaryConditions from .matrix_engine_base import MatrixEngineBase from rrompy.utilities.exception_manager import RROMPyException __all__ = ['ProblemEngineBase'] class ProblemEngineBase(MatrixEngineBase): """ Generic solver for parametric problems. Attributes: verbosity: Verbosity level. BCManager: Boundary condition manager. V: Real FE space. u: Generic trial functions for variational form evaluation. v: Generic test functions for variational form evaluation. As: Scipy sparse array representation (in CSC format) of As. bs: Numpy array representation of bs. energyNormMatrix: Scipy sparse matrix representing inner product over V. energyNormDualMatrix: Scipy sparse matrix representing inner product over V'. dualityMatrix: Scipy sparse matrix representing duality V-V'. energyNormPartialDualMatrix: Scipy sparse matrix representing dual inner product between Riesz representers V-V. degree_threshold: Threshold for ufl expression interpolation degree. """ _dualityCompress = None def __init__(self, degree_threshold : int = np.inf, homogeneized : bool = False, verbosity : int = 10, timestamp : bool = True): self.homogeneized = homogeneized super().__init__(verbosity = verbosity, timestamp = timestamp) self.BCManager = BoundaryConditions("Dirichlet") self.V = fen.FunctionSpace(fen.UnitSquareMesh(10, 10), "P", 1) self.degree_threshold = degree_threshold self.npar = 0 @property def homogeneized(self): """Value of homogeneized.""" return self._homogeneized @homogeneized.setter def homogeneized(self, homogeneized): if (not hasattr(self, "_homogeneized") or homogeneized != self.homogeneized): self._homogeneized = homogeneized if hasattr(self, "_nbs"): self.resetbs() @property def V(self): """Value of V.""" return self._V @V.setter def V(self, V): self.resetAs() self.resetbs() if not type(V).__name__ == 'FunctionSpace': raise RROMPyException("V type not recognized.") self._V = V self.u = fen.TrialFunction(V) self.v = fen.TestFunction(V) def spacedim(self): return self.V.dim() def autoSetDS(self): """Set FEniCS boundary measure based on boundary function handles.""" if self.dsToBeSet: vbMng(self, "INIT", "Initializing boundary measures.", 20) mesh = self.V.mesh() NB = self.NeumannBoundary RB = self.RobinBoundary boundary_markers = fen.MeshFunction("size_t", mesh, mesh.topology().dim() - 1) NB.mark(boundary_markers, 0) RB.mark(boundary_markers, 1) self.ds = fen.Measure("ds", domain = mesh, subdomain_data = boundary_markers) self.dsToBeSet = False vbMng(self, "DEL", "Done assembling boundary measures.", 20) def buildEnergyNormForm(self): """ Build sparse matrix (in CSR format) representative of scalar product. """ vbMng(self, "INIT", "Assembling energy matrix.", 20) self.energyNormMatrix = L2NormMatrix(self.V) vbMng(self, "DEL", "Done assembling energy matrix.", 20) def buildDualityPairingForm(self): """Build sparse matrix (in CSR format) representative of duality.""" vbMng(self, "INIT", "Assembling duality matrix.", 20) self.dualityMatrix = Np2DLikeEye(self.spacedim()) vbMng(self, "DEL", "Done assembling duality matrix.", 20) def liftDirichletData(self) -> Np1D: """Lift Dirichlet datum.""" if not hasattr(self, "_liftedDirichletDatum"): liftRe = interp_project(self.DirichletDatum[0], self.V) liftIm = interp_project(self.DirichletDatum[1], self.V) self._liftedDirichletDatum = (np.array(liftRe.vector()) + 1.j * np.array(liftIm.vector())) return self._liftedDirichletDatum def reduceQuadratureDegree(self, fun:FenFunc, name:str): """Check whether to reduce compiler parameters to degree threshold.""" if not np.isinf(self.degree_threshold): from ufl.algorithms.estimate_degrees import ( estimate_total_polynomial_degree as ETPD) try: deg = ETPD(fun) except: return False if deg > self.degree_threshold: vbMng(self, "MAIN", ("Reducing quadrature degree from {} to {} for " "{}.").format(deg, self.degree_threshold, name), 15) return True return False def iterReduceQuadratureDegree(self, funsNames:List[Tuple[FenFunc, str]]): """ Iterate reduceQuadratureDegree over list and define reduce compiler parameters. """ if funsNames is not None: for fun, name in funsNames: if self.reduceQuadratureDegree(fun, name): return {"quadrature_degree" : self.degree_threshold} return {} def setbHomogeneized(self): if not self.homogeneized: return uLifted = None for j in range(len(self.bs) - self.nbs - 1, -1, -1): if self.bs[self.nbs + j] is None: if uLifted is None: uLifted = - self.liftDirichletData() if self.As[j] is None: self.buildA() vbMng(self, "INIT", "Assembling forcing term bH{}.".format(j), 20) - bH = self.As[j].dot(uLifted) + bH = dot(self.As[j], uLifted) thbH = self.thAs[j] for i in range(self.nbs): if self.thbs[i][0] == thbH[0]: self.bs[i] = self.bs[i] + bH _ = self.bs.pop(self.nbs + j) _ = self.thbs.pop(self.nbs + j) break else: self.bs[self.nbs + j], self.thbs[self.nbs + j] = bH, thbH vbMng(self, "DEL", "Done assembling forcing term.", 20) def plot(self, u:Np1D, warping : List[callable] = None, name : str = "u", save : str = None, what : strLst = 'all', saveFormat : str = "eps", saveDPI : int = 100, show : bool = True, fenplotArgs : dict = {}, **figspecs): """ Do some nice plots of the complex-valued function with given dofs. Args: u: numpy complex array with function dofs. warping(optional): Domain warping functions. name(optional): Name to be shown as title of the plots. Defaults to 'u'. save(optional): Where to save plot(s). Defaults to None, i.e. no saving. what(optional): Which plots to do. If list, can contain 'ABS', 'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard 'ALL'. Defaults to 'ALL'. saveFormat(optional): Format for saved plot(s). Defaults to "eps". saveDPI(optional): DPI for saved plot(s). Defaults to 100. show(optional): Whether to show figure. Defaults to True. fenplotArgs(optional): Optional arguments for fenplot. figspecs(optional key args): Optional arguments for matplotlib figure creation. """ if isinstance(what, (str,)): if what.upper() == 'ALL': what = ['ABS', 'PHASE', 'REAL', 'IMAG'] else: what = [what] what = purgeList(what, ['ABS', 'PHASE', 'REAL', 'IMAG'], listname = self.name() + ".what", baselevel = 1) if len(what) == 0: return if 'figsize' not in figspecs.keys(): figspecs['figsize'] = (13. * len(what) / 4, 3) subplotcode = 100 + len(what) * 10 plt.figure(**figspecs) plt.jet() if 'ABS' in what: uAb = fen.Function(self.V) uAb.vector().set_local(np.abs(u)) subplotcode = subplotcode + 1 plt.subplot(subplotcode) p = fenplot(uAb, warping = warping, title = "|{0}|".format(name), **fenplotArgs) if self.V.mesh().geometric_dimension() > 1: plt.colorbar(p) if 'PHASE' in what: uPh = fen.Function(self.V) uPh.vector().set_local(np.angle(u)) subplotcode = subplotcode + 1 plt.subplot(subplotcode) p = fenplot(uPh, warping = warping, title = "phase({0})".format(name), **fenplotArgs) if self.V.mesh().geometric_dimension() > 1: plt.colorbar(p) if 'REAL' in what: uRe = fen.Function(self.V) uRe.vector().set_local(np.real(u)) subplotcode = subplotcode + 1 plt.subplot(subplotcode) p = fenplot(uRe, warping = warping, title = "Re({0})".format(name), **fenplotArgs) if self.V.mesh().geometric_dimension() > 1: plt.colorbar(p) if 'IMAG' in what: uIm = fen.Function(self.V) uIm.vector().set_local(np.imag(u)) subplotcode = subplotcode + 1 plt.subplot(subplotcode) p = fenplot(uIm, warping = warping, title = "Im({0})".format(name), **fenplotArgs) if self.V.mesh().geometric_dimension() > 1: plt.colorbar(p) if save is not None: save = save.strip() plt.savefig(getNewFilename("{}_fig_".format(save), saveFormat), format = saveFormat, dpi = saveDPI) if show: plt.show() plt.close() def plotmesh(self, warping : List[callable] = None, name : str = "Mesh", save : str = None, saveFormat : str = "eps", saveDPI : int = 100, show : bool = True, fenplotArgs : dict = {}, **figspecs): """ Do a nice plot of the mesh. Args: u: numpy complex array with function dofs. warping(optional): Domain warping functions. name(optional): Name to be shown as title of the plots. Defaults to 'u'. save(optional): Where to save plot(s). Defaults to None, i.e. no saving. saveFormat(optional): Format for saved plot(s). Defaults to "eps". saveDPI(optional): DPI for saved plot(s). Defaults to 100. show(optional): Whether to show figure. Defaults to True. fenplotArgs(optional): Optional arguments for fenplot. figspecs(optional key args): Optional arguments for matplotlib figure creation. """ plt.figure(**figspecs) fenplot(self.V.mesh(), warping = warping, **fenplotArgs) if save is not None: save = save.strip() plt.savefig(getNewFilename("{}_msh_".format(save), saveFormat), format = saveFormat, dpi = saveDPI) if show: plt.show() plt.close() def outParaview(self, u:Np1D, warping : List[callable] = None, name : str = "u", filename : str = "out", time : float = 0., what : strLst = 'all', forceNewFile : bool = True, folder : bool = False, filePW = None): """ Output complex-valued function with given dofs to ParaView file. Args: u: numpy complex array with function dofs. warping(optional): Domain warping functions. name(optional): Base name to be used for data output. filename(optional): Name of output file. time(optional): Timestamp. what(optional): Which plots to do. If list, can contain 'MESH', 'ABS', 'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard 'ALL'. Defaults to 'ALL'. forceNewFile(optional): Whether to create new output file. folder(optional): Whether to create an additional folder layer. filePW(optional): Fenics File entity (for time series). """ if isinstance(what, (str,)): if what.upper() == 'ALL': what = ['MESH', 'ABS', 'PHASE', 'REAL', 'IMAG'] else: what = [what] what = purgeList(what, ['MESH', 'ABS', 'PHASE', 'REAL', 'IMAG'], listname = self.name() + ".what", baselevel = 1) if len(what) == 0: return if filePW is None: if folder: if not path.exists(filename + "/"): mkdir(filename) idxpath = filename.rfind("/") filename += "/" + filename[idxpath + 1 :] if forceNewFile: filePW = fen.File(getNewFilename(filename, "pvd")) else: filePW = fen.File("{}.pvd".format(filename)) if warping is not None: fen.ALE.move(self.V.mesh(), interp_project(warping[0], self.V.mesh())) if what == ['MESH']: filePW << (self.V.mesh(), time) if 'ABS' in what: uAb = fen.Function(self.V, name = "{}_ABS".format(name)) uAb.vector().set_local(np.abs(u)) filePW << (uAb, time) if 'PHASE' in what: uPh = fen.Function(self.V, name = "{}_PHASE".format(name)) uPh.vector().set_local(np.angle(u)) filePW << (uPh, time) if 'REAL' in what: uRe = fen.Function(self.V, name = "{}_REAL".format(name)) uRe.vector().set_local(np.real(u)) filePW << (uRe, time) if 'IMAG' in what: uIm = fen.Function(self.V, name = "{}_IMAG".format(name)) uIm.vector().set_local(np.imag(u)) filePW << (uIm, time) if warping is not None: fen.ALE.move(self.V.mesh(), interp_project(warping[1], self.V.mesh())) return filePW def outParaviewTimeDomain(self, u:Np1D, omega:float, warping : List[callable] = None, timeFinal : float = None, periodResolution : int = 20, name : str = "u", filename : str = "out", forceNewFile : bool = True, folder : bool = False): """ Output complex-valued function with given dofs to ParaView file, converted to time domain. Args: u: numpy complex array with function dofs. omega: frequency. warping(optional): Domain warping functions. timeFinal(optional): final time of simulation. periodResolution(optional): number of time steps per period. name(optional): Base name to be used for data output. filename(optional): Name of output file. forceNewFile(optional): Whether to create new output file. folder(optional): Whether to create an additional folder layer. """ if folder: if not path.exists(filename + "/"): mkdir(filename) idxpath = filename.rfind("/") filename += "/" + filename[idxpath + 1 :] if forceNewFile: filePW = fen.File(getNewFilename(filename, "pvd")) else: filePW = fen.File("{}.pvd".format(filename)) omega = np.abs(omega) t = 0. dt = 2. * np.pi / omega / periodResolution if timeFinal is None: timeFinal = 2. * np.pi / omega - dt if warping is not None: fen.ALE.move(self.V.mesh(), interp_project(warping[0], self.V.mesh())) for j in range(int(np.ceil(timeFinal / dt)) + 1): ut = fen.Function(self.V, name = name) ut.vector().set_local(np.real(u) * np.cos(omega * t) + np.imag(u) * np.sin(omega * t)) filePW << (ut, t) t += dt if warping is not None: fen.ALE.move(self.V.mesh(), interp_project(warping[1], self.V.mesh())) return filePW diff --git a/rrompy/reduction_methods/base/reduced_basis_utils.py b/rrompy/reduction_methods/base/reduced_basis_utils.py index 37e8665..5ebe3b9 100644 --- a/rrompy/reduction_methods/base/reduced_basis_utils.py +++ b/rrompy/reduction_methods/base/reduced_basis_utils.py @@ -1,67 +1,67 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import numpy as np from rrompy.utilities.base.types import (Np1D, Np2D, Tuple, List, ListAny, sampList) -from rrompy.utilities.numerical import hashIdxToDerivative as hashI +from rrompy.utilities.numerical import hashIdxToDerivative as hashI, dot from rrompy.utilities.exception_manager import RROMPyAssert from rrompy.sampling import sampleList __all__ = ['projectAffineDecomposition'] def projectAffineDecomposition(As:List[Np2D], bs:List[Np1D], pMat:sampList, ARBsOld : List[Np2D] = None, bRBsOld : List[Np1D] = None, pMatOld : sampList = None)\ -> Tuple[List[Np2D], List[Np1D]]: """Project affine decomposition of linear system onto basis.""" RROMPyAssert((ARBsOld is None, bRBsOld is None), (pMatOld is None, pMatOld is None), "Old affine projected terms") if isinstance(pMat, (sampleList,)): pMat = pMat.data - pMatH = pMat.T.conj() + pMatC = pMat.conj() ARBs = [None] * len(As) bRBs = [None] * len(bs) if pMatOld is None: for j in range(max(len(As), len(bs))): if j < len(As): - ARBs[j] = pMatH.dot(As[j].dot(pMat)) + ARBs[j] = dot(dot(As[j], pMat).T, pMatC).T if j < len(bs): - bRBs[j] = pMatH.dot(bs[j]) + bRBs[j] = dot(pMatC.T, bs[j]) else: RROMPyAssert((len(ARBsOld), len(bRBsOld)), (len(As), len(bs)), "Old affine projected terms") if isinstance(pMatOld, (sampleList,)): pMatOld = pMatOld.data - pMatOldH = pMatOld.T.conj() + pMatOldC = pMatOld.conj() Sold = pMatOld.shape[1] Snew = pMat.shape[1] for j in range(max(len(As), len(bs))): if j < len(As): ARBs[j] = np.empty((Sold + Snew, Sold + Snew), dtype = ARBsOld[j].dtype) ARBs[j][: Sold, : Sold] = ARBsOld[j] - ARBs[j][: Sold, Sold :] = pMatOldH.dot(As[j].dot(pMat)) - ARBs[j][Sold :, : Sold] = pMatH.dot(As[j].dot(pMatOld)) - ARBs[j][Sold :, Sold :] = pMatH.dot(As[j].dot(pMat)) + ARBs[j][: Sold, Sold :] = dot(dot(As[j], pMat).T, pMatOldC).T + ARBs[j][Sold :, : Sold] = dot(dot(As[j], pMatOld).T, pMatC).T + ARBs[j][Sold :, Sold :] = dot(dot(As[j], pMat).T, pMatC).T if j < len(bs): bRBs[j] = np.empty((Sold + Snew), dtype = bRBsOld[j].dtype) bRBs[j][: Sold] = bRBsOld[j] - bRBs[j][Sold :] = pMatH.dot(bs[j]) + bRBs[j][Sold :] = dot(pMatC.T, bs[j]) return ARBs, bRBs diff --git a/rrompy/reduction_methods/greedy/generic_greedy_approximant.py b/rrompy/reduction_methods/greedy/generic_greedy_approximant.py index 8ff5904..9f7ce60 100644 --- a/rrompy/reduction_methods/greedy/generic_greedy_approximant.py +++ b/rrompy/reduction_methods/greedy/generic_greedy_approximant.py @@ -1,671 +1,672 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from copy import deepcopy as copy import numpy as np from rrompy.reduction_methods.standard.generic_standard_approximant \ import GenericStandardApproximant from rrompy.utilities.base.types import (Np1D, Np2D, DictAny, HFEng, Tuple, List, normEng, paramVal, paramList, sampList) from rrompy.utilities.base import verbosityManager as vbMng +from rrompy.utilities.numerical import dot from rrompy.utilities.expression import expressionEvaluator from rrompy.solver import normEngine from rrompy.utilities.exception_manager import (RROMPyException, RROMPyAssert, RROMPyWarning) from rrompy.parameter import checkParameterList, emptyParameterList __all__ = ['GenericGreedyApproximant'] def localL2Distance(mus:Np2D, badmus:Np2D) -> Np2D: return np.linalg.norm(np.tile(mus[..., np.newaxis], [1, 1, len(badmus)]) - badmus[..., np.newaxis].T, axis = 1) def pruneSamples(mus:paramList, badmus:paramList, tol : float = 1e-8) -> Np1D: """Remove from mus all the elements which are too close to badmus.""" if len(badmus) == 0: return mus proximity = np.min(localL2Distance(mus.data, badmus.data), axis = 1) return np.arange(len(mus))[proximity <= tol] class GenericGreedyApproximant(GenericStandardApproximant): """ ROM greedy interpolant computation for parametric problems (ABSTRACT). Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': whether to compute POD of snapshots; defaults to True; - 'S': number of starting training points; - 'sampler': sample point generator; - 'greedyTol': uniform error tolerance for greedy algorithm; defaults to 1e-2; - 'collinearityTol': collinearity tolerance for greedy algorithm; defaults to 0.; - 'interactive': whether to interactively terminate greedy algorithm; defaults to False; - 'maxIter': maximum number of greedy steps; defaults to 1e2; - 'refinementRatio': ratio of test points to be exhausted before test set refinement; defaults to 0.2; - 'nTestPoints': number of test points; defaults to 5e2; - 'trainSetGenerator': training sample points generator; defaults to sampler. Defaults to empty dict. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. mu0: Default parameter. mus: Array of snapshot parameters. approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList. parameterListSoft: Recognized keys of soft approximant parameters: - 'POD': whether to compute POD of snapshots. - 'greedyTol': uniform error tolerance for greedy algorithm; - 'collinearityTol': collinearity tolerance for greedy algorithm; - 'interactive': whether to interactively terminate greedy algorithm; - 'maxIter': maximum number of greedy steps; - 'refinementRatio': ratio of test points to be exhausted before test set refinement; - 'nTestPoints': number of test points; - 'trainSetGenerator': training sample points generator. parameterListCritical: Recognized keys of critical approximant parameters: - 'S': total number of samples current approximant relies upon; - 'sampler': sample point generator. POD: whether to compute POD of snapshots. S: number of test points. sampler: Sample point generator. greedyTol: Uniform error tolerance for greedy algorithm. collinearityTol: Collinearity tolerance for greedy algorithm. interactive: whether to interactively terminate greedy algorithm. maxIter: maximum number of greedy steps. refinementRatio: ratio of training points to be exhausted before training set refinement. nTestPoints: number of starting training points. trainSetGenerator: training sample points generator. muBounds: list of bounds for parameter values. samplingEngine: Sampling engine. estimatorNormEngine: Engine for estimator norm computation. uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as sampleList. lastSolvedHF: Parameter(s) corresponding to last computed high fidelity solution(s) as parameterList. uApproxReduced: Reduced approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApproxReduced: Parameter(s) corresponding to last computed reduced approximate solution(s) as parameterList. uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApprox: Parameter(s) corresponding to last computed approximate solution(s) as parameterList. """ TOL_INSTABILITY = 1e-6 def __init__(self, HFEngine:HFEng, mu0 : paramVal = None, approxParameters : DictAny = {}, verbosity : int = 10, timestamp : bool = True): self._preInit() self._addParametersToList(["greedyTol", "collinearityTol", "interactive", "maxIter", "refinementRatio", "nTestPoints"], [1e-2, 0., False, 1e2, .2, 5e2], ["trainSetGenerator"], ["AUTO"]) super().__init__(HFEngine = HFEngine, mu0 = mu0, approxParameters = approxParameters, verbosity = verbosity, timestamp = timestamp) self._postInit() @property def greedyTol(self): """Value of greedyTol.""" return self._greedyTol @greedyTol.setter def greedyTol(self, greedyTol): if greedyTol < 0: raise RROMPyException("greedyTol must be non-negative.") if hasattr(self, "_greedyTol") and self.greedyTol is not None: greedyTolold = self.greedyTol else: greedyTolold = -1 self._greedyTol = greedyTol self._approxParameters["greedyTol"] = self.greedyTol if greedyTolold != self.greedyTol: self.resetSamples() @property def collinearityTol(self): """Value of collinearityTol.""" return self._collinearityTol @collinearityTol.setter def collinearityTol(self, collinearityTol): if collinearityTol < 0: raise RROMPyException("collinearityTol must be non-negative.") if (hasattr(self, "_collinearityTol") and self.collinearityTol is not None): collinearityTolold = self.collinearityTol else: collinearityTolold = -1 self._collinearityTol = collinearityTol self._approxParameters["collinearityTol"] = self.collinearityTol if collinearityTolold != self.collinearityTol: self.resetSamples() @property def interactive(self): """Value of interactive.""" return self._interactive @interactive.setter def interactive(self, interactive): self._interactive = interactive @property def maxIter(self): """Value of maxIter.""" return self._maxIter @maxIter.setter def maxIter(self, maxIter): if maxIter <= 0: raise RROMPyException("maxIter must be positive.") if hasattr(self, "_maxIter") and self.maxIter is not None: maxIterold = self.maxIter else: maxIterold = -1 self._maxIter = maxIter self._approxParameters["maxIter"] = self.maxIter if maxIterold != self.maxIter: self.resetSamples() @property def refinementRatio(self): """Value of refinementRatio.""" return self._refinementRatio @refinementRatio.setter def refinementRatio(self, refinementRatio): if refinementRatio <= 0. or refinementRatio > 1.: raise RROMPyException(("refinementRatio must be between 0 " "(excluded) and 1.")) if (hasattr(self, "_refinementRatio") and self.refinementRatio is not None): refinementRatioold = self.refinementRatio else: refinementRatioold = -1 self._refinementRatio = refinementRatio self._approxParameters["refinementRatio"] = self.refinementRatio if refinementRatioold != self.refinementRatio: self.resetSamples() @property def nTestPoints(self): """Value of nTestPoints.""" return self._nTestPoints @nTestPoints.setter def nTestPoints(self, nTestPoints): if nTestPoints <= 0: raise RROMPyException("nTestPoints must be positive.") if not np.isclose(nTestPoints, np.int(nTestPoints)): raise RROMPyException("nTestPoints must be an integer.") nTestPoints = np.int(nTestPoints) if hasattr(self, "_nTestPoints") and self.nTestPoints is not None: nTestPointsold = self.nTestPoints else: nTestPointsold = -1 self._nTestPoints = nTestPoints self._approxParameters["nTestPoints"] = self.nTestPoints if nTestPointsold != self.nTestPoints: self.resetSamples() @property def trainSetGenerator(self): """Value of trainSetGenerator.""" return self._trainSetGenerator @trainSetGenerator.setter def trainSetGenerator(self, trainSetGenerator): if (isinstance(trainSetGenerator, (str,)) and trainSetGenerator.upper() == "AUTO"): trainSetGenerator = self.sampler if 'generatePoints' not in dir(trainSetGenerator): raise RROMPyException("trainSetGenerator type not recognized.") if (hasattr(self, '_trainSetGenerator') and self.trainSetGenerator not in [None, "AUTO"]): trainSetGeneratorOld = self.trainSetGenerator self._trainSetGenerator = trainSetGenerator self._approxParameters["trainSetGenerator"] = self.trainSetGenerator if (not 'trainSetGeneratorOld' in locals() or trainSetGeneratorOld != self.trainSetGenerator): self.resetSamples() def resetSamples(self): """Reset samples.""" super().resetSamples() self._mus = emptyParameterList() def initEstimatorNormEngine(self, normEngn : normEng = None): """Initialize estimator norm engine.""" if (normEngn is not None or not hasattr(self, "estimatorNormEngine") or self.estimatorNormEngine is None): if normEngn is None: if not hasattr(self.HFEngine, "energyNormPartialDualMatrix"): self.HFEngine.buildEnergyNormPartialDualForm() estimatorEnergyMatrix = ( self.HFEngine.energyNormPartialDualMatrix) else: if hasattr(normEngn, "buildEnergyNormPartialDualForm"): if not hasattr(normEngn, "energyNormPartialDualMatrix"): normEngn.buildEnergyNormPartialDualForm() estimatorEnergyMatrix = ( normEngn.energyNormPartialDualMatrix) else: estimatorEnergyMatrix = normEngn self.estimatorNormEngine = normEngine(estimatorEnergyMatrix) def _affineResidualMatricesContraction(self, rb:Np2D, rA : Np2D = None) \ -> Tuple[Np1D, Np1D, Np1D]: self.assembleReducedResidualBlocks(full = True) # 'ij,jk,ik->k', resbb, radiusb, radiusb.conj() ff = np.sum(self.trainedModel.data.resbb.dot(rb) * rb.conj(), axis = 0) if rA is None: return ff # 'ijk,jkl,il->l', resAb, radiusA, radiusb.conj() Lf = np.sum(np.tensordot(self.trainedModel.data.resAb, rA, 2) * rb.conj(), axis = 0) # 'ijkl,klt,ijt->t', resAA, radiusA, radiusA.conj() LL = np.sum(np.tensordot(self.trainedModel.data.resAA, rA, 2) * rA.conj(), axis = (0, 1)) return ff, Lf, LL def errorEstimator(self, mus:Np1D) -> Np1D: """Standard residual-based error estimator.""" self.setupApprox() mus = checkParameterList(mus, self.npar)[0] vbMng(self.trainedModel, "INIT", "Evaluating error estimator at mu = {}.".format(mus), 10) verb = self.trainedModel.verbosity self.trainedModel.verbosity = 0 uApproxRs = self.getApproxReduced(mus) muTestEff = mus ** self.HFEngine.rescalingExp radiusA = np.empty((len(self.HFEngine.thAs), len(mus)), dtype = np.complex) radiusb = np.empty((len(self.HFEngine.thbs), len(mus)), dtype = np.complex) for j, thA in enumerate(self.HFEngine.thAs): radiusA[j] = expressionEvaluator(thA[0], muTestEff) for j, thb in enumerate(self.HFEngine.thbs): radiusb[j] = expressionEvaluator(thb[0], muTestEff) radiusA = np.expand_dims(uApproxRs.data, 1) * radiusA ff, Lf, LL = self._affineResidualMatricesContraction(radiusb, radiusA) err = np.abs((LL - 2. * np.real(Lf) + ff) / ff) ** .5 self.trainedModel.verbosity = verb vbMng(self.trainedModel, "DEL", "Done evaluating error estimator", 10) return err def getMaxErrorEstimator(self, mus:paramList) -> Tuple[Np1D, int, float]: """ Compute maximum of (and index of maximum of) error estimator over given parameters. """ errorEstTest = self.errorEstimator(mus) idxMaxEst = [np.argmax(errorEstTest)] return errorEstTest, idxMaxEst, errorEstTest[idxMaxEst] def _isLastSampleCollinear(self) -> bool: """Check collinearity of last sample.""" if self.collinearityTol <= 0.: return False if self.POD: reff = self.samplingEngine.RPOD[:, -1] else: RROMPyWarning(("Repeated orthogonalization of the samples for " "collinearity check. Consider setting POD to " "True.")) if not hasattr(self, "_PODEngine"): from rrompy.sampling.base.pod_engine import PODEngine self._PODEngine = PODEngine(self.HFEngine) reff = self._PODEngine.generalizedQR(self.samplingEngine.samples, only_R = True)[:, -1] cLevel = np.abs(reff[-1]) / np.linalg.norm(reff) vbMng(self, "MAIN", "Collinearity indicator {:.4e}.".format(cLevel), 5) return cLevel < self.collinearityTol def greedyNextSample(self, muidx:int, plotEst : bool = False)\ -> Tuple[Np1D, int, float, paramVal]: """Compute next greedy snapshot of solution map.""" RROMPyAssert(self._mode, message = "Cannot add greedy sample.") mus = copy(self.muTest[muidx]) self.muTest.pop(muidx) for mu in mus: vbMng(self, "MAIN", ("Adding sample point no. {} at {} to training " "set.").format(self.samplingEngine.nsamples + 1, mu), 2) self.mus.append(mu) self.samplingEngine.nextSample(mu) if self._isLastSampleCollinear(): RROMPyWarning("Collinearity above tolerance detected.") errorEstTest = np.empty(len(self.muTest)) errorEstTest[:] = np.nan return errorEstTest, [-1], np.nan, np.nan errorEstTest, muidx, maxErrorEst = self.getMaxErrorEstimator( self.muTest) if (plotEst and not np.any(np.isnan(errorEstTest)) and not np.any(np.isinf(errorEstTest))): musre = copy(self.muTest.re.data) from matplotlib import pyplot as plt plt.figure() errCP = copy(errorEstTest) while len(musre) > 0: if self.npar == 1: currIdx = np.arange(len(musre)) else: currIdx = np.where(np.isclose(np.sum( np.abs(musre[:, 1 :] - musre[0, 1 :]), 1), 0.))[0] plt.semilogy(musre[currIdx, 0], errCP[currIdx], 'k', linewidth = 1) musre = np.delete(musre, currIdx, 0) errCP = np.delete(errCP, currIdx) plt.semilogy([self.muTest.re(0, 0), self.muTest.re(-1, 0)], [self.greedyTol] * 2, 'r--') plt.semilogy(self.mus.re(0), 2. * self.greedyTol * np.ones(len(self.mus)), '*m') plt.semilogy(self.muTest.re(muidx, 0), maxErrorEst, 'xr') plt.grid() plt.show() plt.close() return errorEstTest, muidx, maxErrorEst, self.muTest[muidx] def _preliminaryTraining(self): """Initialize starting snapshots of solution map.""" RROMPyAssert(self._mode, message = "Cannot start greedy algorithm.") if self.samplingEngine.nsamples > 0: return self.computeScaleFactor() self.resetSamples() self.mus = self.trainSetGenerator.generatePoints(self.S)[ list(range(self.S))] muTestBase = self.sampler.generatePoints(self.nTestPoints) idxPop = pruneSamples(muTestBase ** self.HFEngine.rescalingExp, self.mus ** self.HFEngine.rescalingExp, 1e-10 * self.scaleFactor[0]) muTestBase.pop(idxPop) muTestBase = muTestBase.sort() muLast = copy(self.mus[-1]) self.mus.pop() if len(self.mus) > 0: vbMng(self, "MAIN", ("Adding first {} sample point{} at {} to training " "set.").format(self.S - 1, "" + "s" * (self.S > 2), self.mus), 2) self.samplingEngine.iterSample(self.mus) self.muTest = emptyParameterList() self.muTest.reset((len(muTestBase) + 1, self.mus.shape[1])) self.muTest[: -1] = muTestBase.data self.muTest[-1] = muLast.data def _enrichTestSet(self, nTest:int): """Add extra elements to test set.""" RROMPyAssert(self._mode, message = "Cannot enrich test set.") muTestExtra = self.sampler.generatePoints(2 * nTest) muTotal = copy(self.mus) muTotal.append(self.muTest) idxPop = pruneSamples(muTestExtra ** self.HFEngine.rescalingExp, muTotal ** self.HFEngine.rescalingExp, 1e-10 * self.scaleFactor[0]) muTestExtra.pop(idxPop) muTestNew = np.empty((len(self.muTest) + len(muTestExtra), self.muTest.shape[1]), dtype = np.complex) muTestNew[: len(self.muTest)] = self.muTest.data muTestNew[len(self.muTest) :] = muTestExtra.data self.muTest = checkParameterList(muTestNew, self.npar)[0].sort() vbMng(self, "MAIN", "Enriching test set by {} elements.".format(len(muTestExtra)), 5) def greedy(self, plotEst : bool = False): """Compute greedy snapshots of solution map.""" RROMPyAssert(self._mode, message = "Cannot start greedy algorithm.") if self.samplingEngine.nsamples > 0: return vbMng(self, "INIT", "Starting computation of snapshots.", 2) self._preliminaryTraining() nTest = self.nTestPoints muT0 = copy(self.muTest[-1]) errorEstTest, muidx, maxErrorEst, mu = self.greedyNextSample( [len(self.muTest) - 1], plotEst) if np.any(np.isnan(maxErrorEst)): RROMPyWarning(("Instability in a posteriori estimator. " "Starting preemptive greedy loop termination.")) self.muTest.append(muT0) self.mus.pop(-1) self.samplingEngine.popSample() self.setupApprox() else: vbMng(self, "MAIN", ("Uniform testing error estimate " "{:.4e}.").format(np.max(maxErrorEst)), 2) trainedModelOld = copy(self.trainedModel) while (self.samplingEngine.nsamples < self.maxIter and np.max(maxErrorEst) > self.greedyTol): if (1. - self.refinementRatio) * nTest > len(self.muTest): self._enrichTestSet(nTest) nTest = len(self.muTest) muTestOld, maxErrorEstOld = self.muTest, np.max(maxErrorEst) errorEstTest, muidx, maxErrorEst, mu = self.greedyNextSample( muidx, plotEst) vbMng(self, "MAIN", ("Uniform testing error estimate " "{:.4e}.").format(np.max(maxErrorEst)), 2) if (np.any(np.isnan(maxErrorEst)) or np.any(np.isinf(maxErrorEst)) or maxErrorEstOld < (np.max(maxErrorEst) * self.TOL_INSTABILITY)): RROMPyWarning(("Instability in a posteriori estimator. " "Starting preemptive greedy loop " "termination.")) self.muTest = muTestOld self.mus.pop(-1) self.samplingEngine.popSample() self.trainedModel.data = copy(trainedModelOld.data) break trainedModelOld.data = copy(self.trainedModel.data) if (self.interactive and np.max(maxErrorEst) <= self.greedyTol): vbMng(self, "MAIN", ("Required tolerance {} achieved. Want to decrease " "greedyTol and continue? " "Y/N").format(self.greedyTol), 0, end = "") increasemaxIter = input() if increasemaxIter.upper() == "Y": vbMng(self, "MAIN", "Reducing value of greedyTol...", 0) while np.max(maxErrorEst) <= self._greedyTol: self._greedyTol *= .5 if (self.interactive and self.samplingEngine.nsamples >= self.maxIter): vbMng(self, "MAIN", ("Maximum number of iterations {} reached. Want to " "increase maxIter and continue? " "Y/N").format(self.maxIter), 0, end = "") increasemaxIter = input() if increasemaxIter.upper() == "Y": vbMng(self, "MAIN", "Doubling value of maxIter...", 0) self._maxIter *= 2 vbMng(self, "DEL", ("Done computing snapshots (final snapshot count: " "{}).").format(self.samplingEngine.nsamples), 2) def checkComputedApprox(self) -> bool: """ Check if setup of new approximant is not needed. Returns: True if new setup is not needed. False otherwise. """ return (super().checkComputedApprox() and len(self.mus) == self.trainedModel.data.projMat.shape[1]) def assembleReducedResidualGramian(self, pMat:sampList): """ Build residual gramian of reduced linear system through projections. """ self.initEstimatorNormEngine() if (not hasattr(self.trainedModel.data, "gramian") or self.trainedModel.data.gramian is None): gramian = self.estimatorNormEngine.innerProduct(pMat, pMat) else: Sold = self.trainedModel.data.gramian.shape[0] S = len(self.mus) if Sold > S: gramian = self.trainedModel.data.gramian[: S, : S] else: idxOld = list(range(Sold)) idxNew = list(range(Sold, S)) gramian = np.empty((S, S), dtype = np.complex) gramian[: Sold, : Sold] = self.trainedModel.data.gramian gramian[: Sold, Sold :] = ( self.estimatorNormEngine.innerProduct(pMat(idxNew), pMat(idxOld))) gramian[Sold :, : Sold] = gramian[: Sold, Sold :].T.conj() gramian[Sold :, Sold :] = ( self.estimatorNormEngine.innerProduct(pMat(idxNew), pMat(idxNew))) self.trainedModel.data.gramian = gramian def assembleReducedResidualBlocksbb(self, bs:List[Np1D]): """ Build blocks (of type bb) of reduced linear system through projections. """ self.initEstimatorNormEngine() nbs = len(bs) if (not hasattr(self.trainedModel.data, "resbb") or self.trainedModel.data.resbb is None): resbb = np.empty((nbs, nbs), dtype = np.complex) for i in range(nbs): Mbi = bs[i] resbb[i, i] = self.estimatorNormEngine.innerProduct(Mbi, Mbi) for j in range(i): Mbj = bs[j] resbb[i, j] = self.estimatorNormEngine.innerProduct(Mbj, Mbi) for i in range(nbs): for j in range(i + 1, nbs): resbb[i, j] = resbb[j, i].conj() self.trainedModel.data.resbb = resbb def assembleReducedResidualBlocksAb(self, As:List[Np2D], bs:List[Np1D], pMat:sampList): """ Build blocks (of type Ab) of reduced linear system through projections. """ self.initEstimatorNormEngine() nAs = len(As) nbs = len(bs) S = len(self.mus) if (not hasattr(self.trainedModel.data, "resAb") or self.trainedModel.data.resAb is None): if not isinstance(pMat, (np.ndarray,)): pMat = pMat.data resAb = np.empty((nbs, S, nAs), dtype = np.complex) for j in range(nAs): - MAj = As[j].dot(pMat) + MAj = dot(As[j], pMat) for i in range(nbs): Mbi = bs[i] resAb[i, :, j] = self.estimatorNormEngine.innerProduct(MAj, Mbi) else: Sold = self.trainedModel.data.resAb.shape[1] if Sold == S: return if Sold > S: resAb = self.trainedModel.data.resAb[:, : S, :] else: if not isinstance(pMat, (np.ndarray,)): pMat = pMat.data resAb = np.empty((nbs, S, nAs), dtype = np.complex) resAb[:, : Sold, :] = self.trainedModel.data.resAb for j in range(nAs): - MAj = As[j].dot(pMat[:, Sold :]) + MAj = dot(As[j], pMat[:, Sold :]) for i in range(nbs): Mbi = bs[i] resAb[i, Sold :, j] = ( self.estimatorNormEngine.innerProduct(MAj, Mbi)) self.trainedModel.data.resAb = resAb def assembleReducedResidualBlocksAA(self, As:List[Np2D], pMat:sampList): """ Build blocks (of type AA) of reduced linear system through projections. """ self.initEstimatorNormEngine() nAs = len(As) S = len(self.mus) if (not hasattr(self.trainedModel.data, "resAA") or self.trainedModel.data.resAA is None): if not isinstance(pMat, (np.ndarray,)): pMat = pMat.data resAA = np.empty((S, nAs, S, nAs), dtype = np.complex) for i in range(nAs): - MAi = As[i].dot(pMat) + MAi = dot(As[i], pMat) resAA[:, i, :, i] = ( self.estimatorNormEngine.innerProduct(MAi, MAi)) for j in range(i): - MAj = As[j].dot(pMat) + MAj = dot(As[j], pMat) resAA[:, i, :, j] = ( self.estimatorNormEngine.innerProduct(MAj, MAi)) for i in range(nAs): for j in range(i + 1, nAs): resAA[:, i, :, j] = resAA[:, j, :, i].T.conj() else: Sold = self.trainedModel.data.resAA.shape[0] if Sold == S: return if Sold > S: resAA = self.trainedModel.data.resAA[: S, :, : S, :] else: if not isinstance(pMat, (np.ndarray,)): pMat = pMat.data resAA = np.empty((S, nAs, S, nAs), dtype = np.complex) resAA[: Sold, :, : Sold, :] = self.trainedModel.data.resAA for i in range(nAs): - MAi = As[i].dot(pMat) + MAi = dot(As[i], pMat) resAA[: Sold, i, Sold :, i] = ( self.estimatorNormEngine.innerProduct(MAi[:, Sold :], MAi[:, : Sold])) resAA[Sold :, i, : Sold, i] = resAA[: Sold, i, Sold :, i].T.conj() resAA[Sold :, i, Sold :, i] = ( self.estimatorNormEngine.innerProduct(MAi[:, Sold :], MAi[:, Sold :])) for j in range(i): - MAj = As[j].dot(pMat) + MAj = dot(As[j], pMat) resAA[: Sold, i, Sold :, j] = ( self.estimatorNormEngine.innerProduct(MAj[:, Sold :], MAi[:, : Sold])) resAA[Sold :, i, : Sold, j] = ( self.estimatorNormEngine.innerProduct(MAj[:, : Sold], MAi[:, Sold :])) resAA[Sold :, i, Sold :, j] = ( self.estimatorNormEngine.innerProduct(MAj[:, Sold :], MAi[:, Sold :])) for i in range(nAs): for j in range(i + 1, nAs): resAA[: Sold, i, Sold :, j] = ( resAA[Sold :, j, : Sold, i].T.conj()) resAA[Sold :, i, : Sold, j] = ( resAA[: Sold, j, Sold :, i].T.conj()) resAA[Sold :, i, Sold :, j] = ( resAA[Sold :, j, Sold :, i].T.conj()) self.trainedModel.data.resAA = resAA def assembleReducedResidualBlocks(self, full : bool = False): """Build affine blocks of affine decomposition of residual.""" self.assembleReducedResidualBlocksbb(self.HFEngine.bs) if full: pMat = self.trainedModel.data.projMat self.assembleReducedResidualBlocksAb(self.HFEngine.As, self.HFEngine.bs, pMat) self.assembleReducedResidualBlocksAA(self.HFEngine.As, pMat) diff --git a/rrompy/reduction_methods/pivoted/rational_interpolant_pivoted.py b/rrompy/reduction_methods/pivoted/rational_interpolant_pivoted.py index 95da4a7..fccec16 100644 --- a/rrompy/reduction_methods/pivoted/rational_interpolant_pivoted.py +++ b/rrompy/reduction_methods/pivoted/rational_interpolant_pivoted.py @@ -1,624 +1,624 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from copy import deepcopy as copy import numpy as np from rrompy.reduction_methods.base import checkRobustTolerance from .generic_pivoted_approximant import GenericPivotedApproximant from rrompy.reduction_methods.standard.rational_interpolant import ( RationalInterpolant as RI) from rrompy.utilities.poly_fitting.polynomial import ( polybases as ppb, polyfitname, polyvander as pvP, polyvanderTotal as pvTP, PolynomialInterpolator as PI) from rrompy.utilities.poly_fitting.radial_basis import (polybases as rbpb, RadialBasisInterpolator as RBI) from rrompy.utilities.poly_fitting.moving_least_squares import ( polybases as mlspb, MovingLeastSquaresInterpolator as MLSI) from rrompy.reduction_methods.trained_model import (TrainedModelPivotedData, TrainedModelPivotedRational as tModel) from rrompy.utilities.base.types import (Np1D, Np2D, HFEng, DictAny, Tuple, List, ListAny, paramVal) from rrompy.utilities.base import verbosityManager as vbMng, freepar as fp -from rrompy.utilities.numerical import (multifactorial, customPInv, +from rrompy.utilities.numerical import (multifactorial, customPInv, dot, fullDegreeN, totalDegreeN, degreeTotalToFull, fullDegreeMaxMask, totalDegreeMaxMask, nextDerivativeIndices, hashDerivativeToIdx as hashD, hashIdxToDerivative as hashI) from rrompy.utilities.exception_manager import (RROMPyException, RROMPyAssert, RROMPyWarning) from rrompy.parameter import checkParameter __all__ = ['RationalInterpolantPivoted'] class RationalInterpolantPivoted(GenericPivotedApproximant): """ ROM pivoted rational interpolant (with pole matching) computation for parametric problems. Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. directionPivot(optional): Pivot components. Defaults to [0]. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': whether to compute POD of snapshots; defaults to True; - 'matchingWeight': weight for pole matching optimization; defaults to 1; - 'cutOffTolerance': tolerance for ignoring parasitic poles; defaults to np.inf; - 'cutOffType': rule for tolerance computation for parasitic poles; defaults to 'MAGNITUDE'; - 'S': total number of pivot samples current approximant relies upon; - 'samplerPivot': pivot sample point generator; - 'SMarginal': total number of marginal samples current approximant relies upon; - 'samplerMarginal': marginal sample point generator; - 'polybasisPivot': type of polynomial basis for pivot interpolation; defaults to 'MONOMIAL'; - 'polybasisMarginal': type of polynomial basis for marginal interpolation; allowed values include 'MONOMIAL', 'CHEBYSHEV' and 'LEGENDRE'; defaults to 'MONOMIAL'; - 'M': degree of rational interpolant numerator; defaults to 0; - 'N': degree of rational interpolant denominator; defaults to 0; - 'polydegreetype': type of polynomial degree; defaults to 'TOTAL'; - 'MMarginal': degree of marginal interpolant; defaults to 0; - 'polydegreetypeMarginal': type of polynomial degree for marginal; defaults to 'TOTAL'; - 'radialDirectionalWeightsPivot': radial basis weights for pivot numerator; defaults to 0, i.e. identity; - 'radialDirectionalWeightsMarginal': radial basis weights for marginal interpolant; defaults to 0, i.e. identity; - 'nNearestNeighborPivot': number of pivot nearest neighbors considered if polybasisPivot allows; defaults to -1; - 'nNearestNeighborMarginal': number of marginal nearest neighbors considered if polybasisMarginal allows; defaults to -1; - 'interpRcondPivot': tolerance for pivot interpolation; defaults to None; - 'interpRcondMarginal': tolerance for marginal interpolation; defaults to None; - 'robustTol': tolerance for robust rational denominator management; defaults to 0. Defaults to empty dict. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. mu0: Default parameter. directionPivot: Pivot components. mus: Array of snapshot parameters. musPivot: Array of pivot snapshot parameters. musMarginal: Array of marginal snapshot parameters. approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList. parameterListSoft: Recognized keys of soft approximant parameters: - 'POD': whether to compute POD of snapshots; - 'matchingWeight': weight for pole matching optimization; - 'cutOffTolerance': tolerance for ignoring parasitic poles; - 'cutOffType': rule for tolerance computation for parasitic poles; - 'polybasisPivot': type of polynomial basis for pivot interpolation; - 'polybasisMarginal': type of polynomial basis for marginal interpolation; - 'M': degree of rational interpolant numerator; - 'N': degree of rational interpolant denominator; - 'polydegreetype': type of polynomial degree; - 'MMarginal': degree of marginal interpolant; - 'polydegreetypeMarginal': type of polynomial degree for marginal; - 'radialDirectionalWeightsPivot': radial basis weights for pivot numerator; - 'radialDirectionalWeightsMarginal': radial basis weights for marginal interpolant; - 'nNearestNeighborPivot': number of pivot nearest neighbors considered if polybasisPivot allows; - 'nNearestNeighborMarginal': number of marginal nearest neighbors considered if polybasisMarginal allows; - 'interpRcondPivot': tolerance for pivot interpolation; - 'interpRcondMarginal': tolerance for marginal interpolation; - 'robustTol': tolerance for robust rational denominator management. parameterListCritical: Recognized keys of critical approximant parameters: - 'S': total number of pivot samples current approximant relies upon; - 'samplerPivot': pivot sample point generator; - 'SMarginal': total number of marginal samples current approximant relies upon; - 'samplerMarginal': marginal sample point generator. POD: Whether to compute POD of snapshots. matchingWeight: Weight for pole matching optimization. cutOffTolerance: Tolerance for ignoring parasitic poles. cutOffType: Rule for tolerance computation for parasitic poles. S: Total number of pivot samples current approximant relies upon. sampler: Pivot sample point generator. polybasisPivot: Type of polynomial basis for pivot interpolation. polybasisMarginal: Type of polynomial basis for marginal interpolation. M: Numerator degree of approximant. N: Denominator degree of approximant. polydegreetype: Type of polynomial degree. MMarginal: Degree of marginal interpolant. polydegreetypeMarginal: Type of polynomial degree for marginal. radialDirectionalWeightsPivot: Radial basis weights for pivot numerator. radialDirectionalWeightsMarginal: Radial basis weights for marginal interpolant. nNearestNeighborPivot: Number of pivot nearest neighbors considered if polybasisPivot allows. nNearestNeighborMarginal: Number of marginal nearest neighbors considered if polybasisMarginal allows. interpRcondPivot: Tolerance for pivot interpolation. interpRcondMarginal: Tolerance for marginal interpolation. robustTol: Tolerance for robust rational denominator management. muBoundsPivot: list of bounds for pivot parameter values. muBoundsMarginal: list of bounds for marginal parameter values. samplingEngine: Sampling engine. uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as sampleList. lastSolvedHF: Parameter(s) corresponding to last computed high fidelity solution(s) as parameterList. uApproxReduced: Reduced approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApproxReduced: Parameter(s) corresponding to last computed reduced approximate solution(s) as parameterList. uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApprox: Parameter(s) corresponding to last computed approximate solution(s) as parameterList. Q: Numpy 1D vector containing complex coefficients of approximant denominator. P: Numpy 2D vector whose columns are FE dofs of coefficients of approximant numerator. """ def __init__(self, HFEngine:HFEng, mu0 : paramVal = None, directionPivot : ListAny = [0], approxParameters : DictAny = {}, verbosity : int = 10, timestamp : bool = True): self._preInit() self._addParametersToList(["polybasisPivot", "M", "N", "polydegreetype", "radialDirectionalWeightsPivot", "nNearestNeighborPivot", "interpRcondPivot", "robustTol"], ["MONOMIAL", 0, 0, "TOTAL", 1, -1, -1, 0]) super().__init__(HFEngine = HFEngine, mu0 = mu0, directionPivot = directionPivot, approxParameters = approxParameters, verbosity = verbosity, timestamp = timestamp) self._postInit() @property def polybasisPivot(self): """Value of polybasisPivot.""" return self._polybasisPivot @polybasisPivot.setter def polybasisPivot(self, polybasisPivot): try: polybasisPivot = polybasisPivot.upper().strip().replace(" ","") if polybasisPivot not in ppb + rbpb + mlspb: raise RROMPyException( "Prescribed pivot polybasis not recognized.") self._polybasisPivot = polybasisPivot except: RROMPyWarning(("Prescribed pivot polybasis not recognized. " "Overriding to 'MONOMIAL'.")) self._polybasisPivot = "MONOMIAL" self._approxParameters["polybasisPivot"] = self.polybasisPivot @property def polybasisPivot0(self): if "_" in self.polybasisPivot: return self.polybasisPivot.split("_")[0] return self.polybasisPivot @property def radialDirectionalWeightsPivot(self): """Value of radialDirectionalWeightsPivot.""" return self._radialDirectionalWeightsPivot @radialDirectionalWeightsPivot.setter def radialDirectionalWeightsPivot(self, radialDirectionalWeightsPivot): self._radialDirectionalWeightsPivot = radialDirectionalWeightsPivot self._approxParameters["radialDirectionalWeightsPivot"] = ( self.radialDirectionalWeightsPivot) @property def nNearestNeighborPivot(self): """Value of nNearestNeighborPivot.""" return self._nNearestNeighborPivot @nNearestNeighborPivot.setter def nNearestNeighborPivot(self, nNearestNeighborPivot): self._nNearestNeighborPivot = nNearestNeighborPivot self._approxParameters["nNearestNeighborPivot"] = ( self.nNearestNeighborPivot) @property def interpRcondPivot(self): """Value of interpRcondPivot.""" return self._interpRcondPivot @interpRcondPivot.setter def interpRcondPivot(self, interpRcondPivot): self._interpRcondPivot = interpRcondPivot self._approxParameters["interpRcondPivot"] = self.interpRcondPivot @property def M(self): """Value of M.""" return self._M @M.setter def M(self, M): if M < 0: raise RROMPyException("M must be non-negative.") self._M = M self._approxParameters["M"] = self.M @property def N(self): """Value of N.""" return self._N @N.setter def N(self, N): if N < 0: raise RROMPyException("N must be non-negative.") self._N = N self._approxParameters["N"] = self.N @property def polydegreetype(self): """Value of polydegreetype.""" return self._polydegreetype @polydegreetype.setter def polydegreetype(self, polydegreetype): try: polydegreetype = polydegreetype.upper().strip().replace(" ","") if polydegreetype not in ["TOTAL", "FULL"]: raise RROMPyException(("Prescribed polydegreetype not " "recognized.")) self._polydegreetype = polydegreetype except: RROMPyWarning(("Prescribed polydegreetype not recognized. " "Overriding to 'TOTAL'.")) self._polydegreetype = "TOTAL" self._approxParameters["polydegreetype"] = self.polydegreetype @property def robustTol(self): """Value of tolerance for robust rational denominator management.""" return self._robustTol @robustTol.setter def robustTol(self, robustTol): if robustTol < 0.: RROMPyWarning(("Overriding prescribed negative robustness " "tolerance to 0.")) robustTol = 0. self._robustTol = robustTol self._approxParameters["robustTol"] = self.robustTol def resetSamples(self): """Reset samples.""" super().resetSamples() self._musPUniqueCN = None self._derPIdxs = None self._reorderP = None def _setupPivotInterpolationIndices(self): """Setup parameters for polyvander.""" RROMPyAssert(self._mode, message = "Cannot setup interpolation indices.") if (self._musPUniqueCN is None or len(self._reorderP) != len(self.musPivot)): self._musPUniqueCN, musPIdxsTo, musPIdxs, musPCount = ( self.trainedModel.centerNormalizePivot(self.musPivot).unique( return_index = True, return_inverse = True, return_counts = True)) self._musPUnique = self.mus[musPIdxsTo] self._derPIdxs = [None] * len(self._musPUniqueCN) self._reorderP = np.empty(len(musPIdxs), dtype = int) filled = 0 for j, cnt in enumerate(musPCount): self._derPIdxs[j] = nextDerivativeIndices([], self.nparPivot, cnt) jIdx = np.nonzero(musPIdxs == j)[0] self._reorderP[jIdx] = np.arange(filled, filled + cnt) filled += cnt def _setupDenominator(self): """Compute rational denominator.""" RROMPyAssert(self._mode, message = "Cannot setup denominator.") vbMng(self, "INIT", "Starting computation of denominator.", 7) NinvD = None N0 = copy(self.N) qs = [] self.verbosity -= 10 for j in range(len(self.musMarginal)): self._N = N0 while self.N > 0: if NinvD != self.N: invD, fitinvP = self._computeInterpolantInverseBlocks() NinvD = self.N if self.POD: ev, eV = RI.findeveVGQR(self, self.samplingEngine.RPOD[j], invD) else: ev, eV = RI.findeveVGExplicit(self, self.samplingEngine.samples[j], invD) nevBad = checkRobustTolerance(ev, self.robustTol) if nevBad <= 1: break if self.catchInstability: raise RROMPyException(("Instability in denominator " "computation: eigenproblem is " "poorly conditioned.")) RROMPyWarning(("Smallest {} eigenvalues below tolerance. " "Reducing N by 1.").format(nevBad)) self.N = self.N - 1 if self.N <= 0: self._N = 0 eV = np.ones((1, 1)) q = PI() q.npar = self.nparPivot q.polybasis = self.polybasisPivot0 if self.polydegreetype == "TOTAL": q.coeffs = degreeTotalToFull(tuple([self.N + 1] * q.npar), q.npar, eV[:, 0]) else: q.coeffs = eV[:, 0].reshape([self.N + 1] * q.npar) qs = qs + [copy(q)] self.verbosity += 10 vbMng(self, "DEL", "Done computing denominator.", 7) return qs, fitinvP def _setupNumerator(self): """Compute rational numerator.""" RROMPyAssert(self._mode, message = "Cannot setup numerator.") vbMng(self, "INIT", "Starting computation of numerator.", 7) Qevaldiag = np.zeros((len(self.musPivot), len(self.musPivot)), dtype = np.complex) verb = self.trainedModel.verbosity self.trainedModel.verbosity = 0 self._setupPivotInterpolationIndices() cfun = totalDegreeN if self.polydegreetype == "TOTAL" else fullDegreeN M = copy(self.M) while len(self.musPivot) < cfun(M, self.nparPivot): M -= 1 if M < self.M: RROMPyWarning(("M too large compared to S. Reducing M by " "{}").format(self.M - M)) self.M = M tensor_idx = 0 ps = [] for k, muM in enumerate(self.musMarginal): self._M = M idxGlob = 0 for j, derIdxs in enumerate(self._derPIdxs): mujEff = [fp] * self.npar for jj, kk in enumerate(self.directionPivot): mujEff[kk] = self._musPUnique[j, jj] for jj, kk in enumerate(self.directionMarginal): mujEff[kk] = muM(0, jj) mujEff = checkParameter(mujEff, self.npar) nder = len(derIdxs) idxLoc = np.arange(len(self.musPivot))[ (self._reorderP >= idxGlob) * (self._reorderP < idxGlob + nder)] idxGlob += nder Qval = [0] * nder for der in range(nder): derIdx = hashI(der, self.nparPivot) derIdxEff = [0] * self.npar sclEff = [0] * self.npar for jj, kk in enumerate(self.directionPivot): derIdxEff[kk] = derIdx[jj] sclEff[kk] = self.scaleFactorPivot[jj] ** -1. Qval[der] = (self.trainedModel.getQVal(mujEff, derIdxEff, scl = sclEff) / multifactorial(derIdx)) for derU, derUIdx in enumerate(derIdxs): for derQ, derQIdx in enumerate(derIdxs): diffIdx = [x - y for (x, y) in zip(derUIdx, derQIdx)] if all([x >= 0 for x in diffIdx]): diffj = hashD(diffIdx) Qevaldiag[idxLoc[derU], idxLoc[derQ]] = Qval[diffj] while self.M >= 0: if self.polybasisPivot in ppb: p = PI() wellCond, msg = p.setupByInterpolation( self._musPUniqueCN, Qevaldiag, self.M, self.polybasisPivot, self.verbosity >= 5, self.polydegreetype == "TOTAL", {"derIdxs": self._derPIdxs, "reorder": self._reorderP, "scl": np.power(self.scaleFactorPivot, -1.)}, {"rcond": self.interpRcondPivot}) elif self.polybasisPivot in rbpb: p = RBI() wellCond, msg = p.setupByInterpolation( self._musPUniqueCN, Qevaldiag, self.M, self.polybasisPivot, self.radialDirectionalWeightsPivot, self.verbosity >= 5, self.polydegreetype == "TOTAL", {"derIdxs": self._derPIdxs, "reorder": self._reorderP, "scl": np.power(self.scaleFactorPivot, -1.), "nNearestNeighbor" : self.nNearestNeighborPivot}, {"rcond": self.interpRcondPivot}) else:# if self.polybasisPivot in mlspb: p = MLSI() wellCond, msg = p.setupByInterpolation( self._musPUniqueCN, Qevaldiag, self.M, self.polybasisPivot, self.radialDirectionalWeightsPivot, self.verbosity >= 5, self.polydegreetype == "TOTAL", {"derIdxs": self._derPIdxs, "reorder": self._reorderP, "scl": np.power(self.scaleFactorPivot, -1.), "nNearestNeighbor" : self.nNearestNeighborPivot}) vbMng(self, "MAIN", msg, 5) if wellCond: break if self.catchInstability: raise RROMPyException(("Instability in numerator " "computation: polyfit is " "poorly conditioned.")) RROMPyWarning(("Polyfit is poorly conditioned. " "Reducing M by 1.")) self.M = self.M - 1 tensor_idx_new = tensor_idx + Qevaldiag.shape[1] if self.POD: p.postmultiplyTensorize(self.samplingEngine.RPODCoalesced.T[ tensor_idx : tensor_idx_new, :]) else: p.pad(tensor_idx, len(self.mus) - tensor_idx_new) tensor_idx = tensor_idx_new ps = ps + [copy(p)] self.trainedModel.verbosity = verb vbMng(self, "DEL", "Done computing numerator.", 7) return ps def setupApprox(self): """ Compute rational interpolant. SVD-based robust eigenvalue management. """ if self.checkComputedApprox(): return RROMPyAssert(self._mode, message = "Cannot setup approximant.") vbMng(self, "INIT", "Setting up {}.". format(self.name()), 5) self.computeSnapshots() if self.trainedModel is None: self.trainedModel = tModel() self.trainedModel.verbosity = self.verbosity self.trainedModel.timestamp = self.timestamp data = TrainedModelPivotedData(self.trainedModel.name(), self.mu0, self.samplingEngine.samplesCoalesced, self.scaleFactor, self.HFEngine.rescalingExp, self.directionPivot) data.musPivot = copy(self.musPivot) data.musMarginal = copy(self.musMarginal) self.trainedModel.data = data else: self.trainedModel = self.trainedModel self.trainedModel.data.projMat = copy( self.samplingEngine.samplesCoalesced) self.trainedModel.data.marginalInterp = self._setupMarginalInterp() if self.N > 0: Qs = self._setupDenominator()[0] else: Q = PI() Q.npar = self.nparPivot Q.coeffs = np.ones(tuple([1] * Q.npar), dtype = self.musMarginal.dtype) Q.polybasis = self.polybasisPivot0 Qs = [Q for _ in range(len(self.musMarginal))] self.trainedModel.data._temporary = True self.trainedModel.data.Qs = Qs self.trainedModel.data.Ps = self._setupNumerator() vbMng(self, "INIT", "Matching poles.", 10) self.trainedModel.initializeFromRational(self.HFEngine, self.matchingWeight, self.POD) vbMng(self, "DEL", "Done matching poles.", 10) if not np.isinf(self.cutOffTolerance): vbMng(self, "INIT", "Recompressing by cut-off.", 10) msg = self.trainedModel.recompressByCutOff([-1., 1.], self.cutOffTolerance, self.cutOffType) vbMng(self, "DEL", "Done recompressing." + msg, 10) self.trainedModel.data.approxParameters = copy(self.approxParameters) vbMng(self, "DEL", "Done setting up approximant.", 5) def _computeInterpolantInverseBlocks(self) -> Tuple[List[Np2D], Np2D]: """ Compute inverse factors for minimal interpolant target functional. """ RROMPyAssert(self._mode, message = "Cannot solve eigenvalue problem.") self._setupPivotInterpolationIndices() cfun = totalDegreeN if self.polydegreetype == "TOTAL" else fullDegreeN N = copy(self.N) while len(self.musPivot) < cfun(N, self.nparPivot): N -= 1 if N < self.N: RROMPyWarning(("N too large compared to S. Reducing N by " "{}").format(self.N - N)) self.N = N while self.N >= 0: if self.polydegreetype == "TOTAL": TE, _, argIdxs = pvTP(self._musPUniqueCN, self.N, self.polybasisPivot0, self._derPIdxs, self._reorderP, scl = np.power(self.scaleFactorPivot, -1.)) TE = TE[:, argIdxs] idxsB = totalDegreeMaxMask(self.N, self.nparPivot) else: #if self.polydegreetype == "FULL": TE = pvP(self._musPUniqueCN, [self.N] * self.nparPivot, self.polybasisPivot0, self._derPIdxs, self._reorderP, scl = np.power(self.scaleFactorPivot, -1.)) idxsB = fullDegreeMaxMask(self.N, self.nparPivot) fitOut = customPInv(TE, rcond = self.interpRcondPivot, full = True) vbMng(self, "MAIN", ("Fitting {} samples with degree {} through {}... " "Conditioning of pseudoinverse system: {:.4e}.").format( TE.shape[0], self.N, polyfitname(self.polybasisPivot0), fitOut[1][1][0] / fitOut[1][1][-1]), 5) if fitOut[1][0] == TE.shape[1]: fitinvP = fitOut[0][idxsB, :] break RROMPyWarning("Polyfit is poorly conditioned. Reducing N by 1.") self.N -= 1 if self.N < 0: raise RROMPyException(("Instability in computation of " "denominator. Aborting.")) TN, _, argIdxs = pvTP(self._musPUniqueCN, self.N, self.polybasisPivot0, self._derPIdxs, self._reorderP, scl = np.power(self.scaleFactorPivot, -1.)) TN = TN[:, argIdxs] invD = [None] * (len(idxsB)) for k in range(len(idxsB)): pseudoInv = np.diag(fitinvP[k, :]) idxGlob = 0 for j, derIdxs in enumerate(self._derPIdxs): nder = len(derIdxs) idxGlob += nder if nder > 1: idxLoc = np.arange(len(self.musPivot))[ (self._reorderP >= idxGlob - nder) * (self._reorderP < idxGlob)] invLoc = fitinvP[k, idxLoc] pseudoInv[np.ix_(idxLoc, idxLoc)] = 0. for diffj, diffjIdx in enumerate(derIdxs): for derQ, derQIdx in enumerate(derIdxs): derUIdx = [x - y for (x, y) in zip(diffjIdx, derQIdx)] if all([x >= 0 for x in derUIdx]): derU = hashD(derUIdx) pseudoInv[idxLoc[derU], idxLoc[derQ]] = ( invLoc[diffj]) - invD[k] = pseudoInv.dot(TN) + invD[k] = dot(pseudoInv, TN) return invD, fitinvP def getResidues(self, *args, **kwargs) -> Np1D: """ Obtain approximant residues. Returns: Matrix with residues as columns. """ return self.trainedModel.getResidues(*args, **kwargs) diff --git a/rrompy/reduction_methods/standard/rational_interpolant.py b/rrompy/reduction_methods/standard/rational_interpolant.py index b0ae423..0f50c33 100644 --- a/rrompy/reduction_methods/standard/rational_interpolant.py +++ b/rrompy/reduction_methods/standard/rational_interpolant.py @@ -1,561 +1,561 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from copy import deepcopy as copy import numpy as np from rrompy.reduction_methods.base import checkRobustTolerance from .generic_standard_approximant import GenericStandardApproximant from rrompy.utilities.poly_fitting.polynomial import ( polybases as ppb, polyfitname, polyvander as pvP, polyvanderTotal as pvTP, PolynomialInterpolator as PI) from rrompy.utilities.poly_fitting.radial_basis import (polybases as rbpb, RadialBasisInterpolator as RBI) from rrompy.utilities.poly_fitting.moving_least_squares import ( polybases as mlspb, MovingLeastSquaresInterpolator as MLSI) from rrompy.reduction_methods.trained_model import ( TrainedModelRational as tModel) from rrompy.reduction_methods.trained_model import TrainedModelData from rrompy.utilities.base.types import (Np1D, Np2D, HFEng, DictAny, Tuple, List, paramVal, sampList) from rrompy.utilities.base import verbosityManager as vbMng -from rrompy.utilities.numerical import (multifactorial, customPInv, +from rrompy.utilities.numerical import (multifactorial, customPInv, dot, fullDegreeN, totalDegreeN, degreeTotalToFull, fullDegreeMaxMask, totalDegreeMaxMask, nextDerivativeIndices, hashDerivativeToIdx as hashD, hashIdxToDerivative as hashI) from rrompy.utilities.exception_manager import (RROMPyException, RROMPyAssert, RROMPyWarning) __all__ = ['RationalInterpolant'] class RationalInterpolant(GenericStandardApproximant): """ ROM rational interpolant computation for parametric problems. Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': whether to compute POD of snapshots; defaults to True; - 'S': total number of samples current approximant relies upon; - 'sampler': sample point generator; - 'polybasis': type of polynomial basis for interpolation; defaults to 'MONOMIAL'; - 'M': degree of rational interpolant numerator; defaults to 0; - 'N': degree of rational interpolant denominator; defaults to 0; - 'polydegreetype': type of polynomial degree; defaults to 'TOTAL'; - 'radialDirectionalWeights': radial basis weights for interpolant numerator; defaults to 0, i.e. identity; - 'nNearestNeighbor': mumber of nearest neighbors considered in numerator if polybasis allows; defaults to -1; - 'interpRcond': tolerance for interpolation; defaults to None; - 'robustTol': tolerance for robust rational denominator management; defaults to 0. Defaults to empty dict. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. mu0: Default parameter. mus: Array of snapshot parameters. approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList. parameterListSoft: Recognized keys of soft approximant parameters: - 'POD': whether to compute POD of snapshots; - 'polybasis': type of polynomial basis for interpolation; - 'M': degree of rational interpolant numerator; - 'N': degree of rational interpolant denominator; - 'polydegreetype': type of polynomial degree; - 'radialDirectionalWeights': radial basis weights for interpolant numerator; - 'nNearestNeighbor': mumber of nearest neighbors considered in numerator if polybasis allows; - 'interpRcond': tolerance for interpolation via numpy.polyfit; - 'robustTol': tolerance for robust rational denominator management. parameterListCritical: Recognized keys of critical approximant parameters: - 'S': total number of samples current approximant relies upon; - 'sampler': sample point generator. POD: Whether to compute POD of snapshots. S: Number of solution snapshots over which current approximant is based upon. sampler: Sample point generator. polybasis: type of polynomial basis for interpolation. M: Numerator degree of approximant. N: Denominator degree of approximant. polydegreetype: Type of polynomial degree. radialDirectionalWeights: Radial basis weights for interpolant numerator. nNearestNeighbor: Number of nearest neighbors considered in numerator if polybasis allows. interpRcond: Tolerance for interpolation via numpy.polyfit. robustTol: Tolerance for robust rational denominator management. muBounds: list of bounds for parameter values. samplingEngine: Sampling engine. uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as sampleList. lastSolvedHF: Parameter(s) corresponding to last computed high fidelity solution(s) as parameterList. uApproxReduced: Reduced approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApproxReduced: Parameter(s) corresponding to last computed reduced approximate solution(s) as parameterList. uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApprox: Parameter(s) corresponding to last computed approximate solution(s) as parameterList. Q: Numpy 1D vector containing complex coefficients of approximant denominator. P: Numpy 2D vector whose columns are FE dofs of coefficients of approximant numerator. """ def __init__(self, HFEngine:HFEng, mu0 : paramVal = None, approxParameters : DictAny = {}, verbosity : int = 10, timestamp : bool = True): self._preInit() self._addParametersToList(["polybasis", "M", "N", "polydegreetype", "radialDirectionalWeights", "nNearestNeighbor", "interpRcond", "robustTol"], ["MONOMIAL", 0, 0, "TOTAL", 1, -1, -1, 0]) super().__init__(HFEngine = HFEngine, mu0 = mu0, approxParameters = approxParameters, verbosity = verbosity, timestamp = timestamp) self.catchInstability = False self._postInit() @property def polybasis(self): """Value of polybasis.""" return self._polybasis @polybasis.setter def polybasis(self, polybasis): try: polybasis = polybasis.upper().strip().replace(" ","") if polybasis not in ppb + rbpb + mlspb: raise RROMPyException("Prescribed polybasis not recognized.") self._polybasis = polybasis except: RROMPyWarning(("Prescribed polybasis not recognized. Overriding " "to 'MONOMIAL'.")) self._polybasis = "MONOMIAL" self._approxParameters["polybasis"] = self.polybasis @property def polybasis0(self): if "_" in self.polybasis: return self.polybasis.split("_")[0] return self.polybasis @property def interpRcond(self): """Value of interpRcond.""" return self._interpRcond @interpRcond.setter def interpRcond(self, interpRcond): self._interpRcond = interpRcond self._approxParameters["interpRcond"] = self.interpRcond @property def radialDirectionalWeights(self): """Value of radialDirectionalWeights.""" return self._radialDirectionalWeights @radialDirectionalWeights.setter def radialDirectionalWeights(self, radialDirectionalWeights): self._radialDirectionalWeights = radialDirectionalWeights self._approxParameters["radialDirectionalWeights"] = ( self.radialDirectionalWeights) @property def nNearestNeighbor(self): """Value of nNearestNeighbor.""" return self._nNearestNeighbor @nNearestNeighbor.setter def nNearestNeighbor(self, nNearestNeighbor): self._nNearestNeighbor = nNearestNeighbor self._approxParameters["nNearestNeighbor"] = self.nNearestNeighbor @property def M(self): """Value of M.""" return self._M @M.setter def M(self, M): if M < 0: raise RROMPyException("M must be non-negative.") self._M = M self._approxParameters["M"] = self.M @property def N(self): """Value of N.""" return self._N @N.setter def N(self, N): if N < 0: raise RROMPyException("N must be non-negative.") self._N = N self._approxParameters["N"] = self.N @property def polydegreetype(self): """Value of polydegreetype.""" return self._polydegreetype @polydegreetype.setter def polydegreetype(self, polydegreetype): try: polydegreetype = polydegreetype.upper().strip().replace(" ","") if polydegreetype not in ["TOTAL", "FULL"]: raise RROMPyException(("Prescribed polydegreetype not " "recognized.")) self._polydegreetype = polydegreetype except: RROMPyWarning(("Prescribed polydegreetype not recognized. " "Overriding to 'TOTAL'.")) self._polydegreetype = "TOTAL" self._approxParameters["polydegreetype"] = self.polydegreetype @property def robustTol(self): """Value of tolerance for robust rational denominator management.""" return self._robustTol @robustTol.setter def robustTol(self, robustTol): if robustTol < 0.: RROMPyWarning(("Overriding prescribed negative robustness " "tolerance to 0.")) robustTol = 0. self._robustTol = robustTol self._approxParameters["robustTol"] = self.robustTol def resetSamples(self): """Reset samples.""" super().resetSamples() self._musUniqueCN = None self._derIdxs = None self._reorder = None def _setupInterpolationIndices(self): """Setup parameters for polyvander.""" RROMPyAssert(self._mode, message = "Cannot setup interpolation indices.") if self._musUniqueCN is None or len(self._reorder) != len(self.mus): self._musUniqueCN, musIdxsTo, musIdxs, musCount = ( self.trainedModel.centerNormalize(self.mus).unique( return_index = True, return_inverse = True, return_counts = True)) self._musUnique = self.mus[musIdxsTo] self._derIdxs = [None] * len(self._musUniqueCN) self._reorder = np.empty(len(musIdxs), dtype = int) filled = 0 for j, cnt in enumerate(musCount): self._derIdxs[j] = nextDerivativeIndices([], self.mus.shape[1], cnt) jIdx = np.nonzero(musIdxs == j)[0] self._reorder[jIdx] = np.arange(filled, filled + cnt) filled += cnt def _setupDenominator(self): """Compute rational denominator.""" RROMPyAssert(self._mode, message = "Cannot setup denominator.") vbMng(self, "INIT", "Starting computation of denominator.", 7) while self.N > 0: invD, fitinv = self._computeInterpolantInverseBlocks() if self.POD: ev, eV = self.findeveVGQR(self.samplingEngine.RPOD, invD) else: ev, eV = self.findeveVGExplicit(self.samplingEngine.samples, invD) nevBad = checkRobustTolerance(ev, self.robustTol) if nevBad <= 1: break if self.catchInstability: raise RROMPyException(("Instability in denominator " "computation: eigenproblem is poorly " "conditioned.")) RROMPyWarning(("Smallest {} eigenvalues below tolerance. Reducing " "N by 1.").format(nevBad)) self.N = self.N - 1 if self.N <= 0: self._N = 0 eV = np.ones((1, 1)) q = PI() q.npar = self.npar q.polybasis = self.polybasis0 if self.polydegreetype == "TOTAL": q.coeffs = degreeTotalToFull(tuple([self.N + 1] * self.npar), self.npar, eV[:, 0]) else: q.coeffs = eV[:, 0].reshape([self.N + 1] * self.npar) vbMng(self, "DEL", "Done computing denominator.", 7) return q, fitinv def _setupNumerator(self): """Compute rational numerator.""" RROMPyAssert(self._mode, message = "Cannot setup numerator.") vbMng(self, "INIT", "Starting computation of numerator.", 7) Qevaldiag = np.zeros((len(self.mus), len(self.mus)), dtype = np.complex) verb = self.trainedModel.verbosity self.trainedModel.verbosity = 0 self._setupInterpolationIndices() idxGlob = 0 for j, derIdxs in enumerate(self._derIdxs): nder = len(derIdxs) idxLoc = np.arange(len(self.mus))[(self._reorder >= idxGlob) * (self._reorder < idxGlob + nder)] idxGlob += nder Qval = [0] * nder for der in range(nder): derIdx = hashI(der, self.npar) Qval[der] = (self.trainedModel.getQVal( self._musUnique[j], derIdx, scl = np.power(self.scaleFactor, -1.)) / multifactorial(derIdx)) for derU, derUIdx in enumerate(derIdxs): for derQ, derQIdx in enumerate(derIdxs): diffIdx = [x - y for (x, y) in zip(derUIdx, derQIdx)] if all([x >= 0 for x in diffIdx]): diffj = hashD(diffIdx) Qevaldiag[idxLoc[derU], idxLoc[derQ]] = Qval[diffj] if self.POD: Qevaldiag = Qevaldiag.dot(self.samplingEngine.RPOD.T) self.trainedModel.verbosity = verb cfun = totalDegreeN if self.polydegreetype == "TOTAL" else fullDegreeN M = copy(self.M) while len(self.mus) < cfun(M, self.npar): M -= 1 if M < self.M: RROMPyWarning(("M too large compared to S. Reducing M by " "{}").format(self.M - M)) self.M = M while self.M >= 0: if self.polybasis in ppb: p = PI() wellCond, msg = p.setupByInterpolation( self._musUniqueCN, Qevaldiag, self.M, self.polybasis, self.verbosity >= 5, self.polydegreetype == "TOTAL", {"derIdxs": self._derIdxs, "reorder": self._reorder, "scl": np.power(self.scaleFactor, -1.)}, {"rcond": self.interpRcond}) elif self.polybasis in rbpb: p = RBI() wellCond, msg = p.setupByInterpolation( self._musUniqueCN, Qevaldiag, self.M, self.polybasis, self.radialDirectionalWeights, self.verbosity >= 5, self.polydegreetype == "TOTAL", {"derIdxs": self._derIdxs, "reorder": self._reorder, "scl": np.power(self.scaleFactor, -1.), "nNearestNeighbor": self.nNearestNeighbor}, {"rcond": self.interpRcond}) else:# if self.polybasis in mlspb: p = MLSI() wellCond, msg = p.setupByInterpolation( self._musUniqueCN, Qevaldiag, self.M, self.polybasis, self.radialDirectionalWeights, self.verbosity >= 5, self.polydegreetype == "TOTAL", {"derIdxs": self._derIdxs, "reorder": self._reorder, "scl": np.power(self.scaleFactor, -1.), "nNearestNeighbor": self.nNearestNeighbor}) vbMng(self, "MAIN", msg, 5) if wellCond: break if self.catchInstability: raise RROMPyException(("Instability in numerator computation: " "polyfit is poorly conditioned.")) RROMPyWarning("Polyfit is poorly conditioned. Reducing M by 1.") self.M = self.M - 1 if self.M < 0: raise RROMPyException(("Instability in computation of numerator. " "Aborting.")) vbMng(self, "DEL", "Done computing numerator.", 7) return p def setupApprox(self): """ Compute rational interpolant. SVD-based robust eigenvalue management. """ if self.checkComputedApprox(): return RROMPyAssert(self._mode, message = "Cannot setup approximant.") vbMng(self, "INIT", "Setting up {}.". format(self.name()), 5) self.computeSnapshots() if self.trainedModel is None: self.trainedModel = tModel() self.trainedModel.verbosity = self.verbosity self.trainedModel.timestamp = self.timestamp data = TrainedModelData(self.trainedModel.name(), self.mu0, self.samplingEngine.samples, self.scaleFactor, self.HFEngine.rescalingExp) data.mus = copy(self.mus) self.trainedModel.data = data else: self.trainedModel = self.trainedModel self.trainedModel.data.projMat = copy(self.samplingEngine.samples) if self.N > 0: Q = self._setupDenominator()[0] else: Q = PI() Q.coeffs = np.ones(tuple([1] * self.npar), dtype = np.complex) Q.npar = self.npar Q.polybasis = self.polybasis self.trainedModel.data.Q = Q self.trainedModel.data.P = self._setupNumerator() self.trainedModel.data.approxParameters = copy(self.approxParameters) vbMng(self, "DEL", "Done setting up approximant.", 5) def _computeInterpolantInverseBlocks(self) -> Tuple[List[Np2D], Np2D]: """ Compute inverse factors for minimal interpolant target functional. """ RROMPyAssert(self._mode, message = "Cannot solve eigenvalue problem.") self._setupInterpolationIndices() cfun = totalDegreeN if self.polydegreetype == "TOTAL" else fullDegreeN N = copy(self.N) while len(self.mus) < cfun(N, self.npar): N -= 1 if N < self.N: RROMPyWarning(("N too large compared to S. Reducing N by " "{}").format(self.N - N)) self.N = N while self.N >= 0: if self.polydegreetype == "TOTAL": TE, _, argIdxs = pvTP(self._musUniqueCN, self.N, self.polybasis0, self._derIdxs, self._reorder, scl = np.power(self.scaleFactor, -1.)) TE = TE[:, argIdxs] idxsB = totalDegreeMaxMask(self.N, self.npar) else: #if self.polydegreetype == "FULL": TE = pvP(self._musUniqueCN, [self.N] * self.npar, self.polybasis0, self._derIdxs, self._reorder, scl = np.power(self.scaleFactor, -1.)) idxsB = fullDegreeMaxMask(self.N, self.npar) fitOut = customPInv(TE, rcond = self.interpRcond, full = True) vbMng(self, "MAIN", ("Fitting {} samples with degree {} through {}... " "Conditioning of pseudoinverse system: {:.4e}.").format( TE.shape[0], self.N, polyfitname(self.polybasis0), fitOut[1][1][0] / fitOut[1][1][-1]), 5) if fitOut[1][0] == TE.shape[1]: fitinv = fitOut[0][idxsB, :] break if self.catchInstability: raise RROMPyException(("Instability in denominator " "computation: polyfit is poorly " "conditioned.")) RROMPyWarning("Polyfit is poorly conditioned. Reducing N by 1.") self.N = self.N - 1 if self.polydegreetype == "TOTAL": TN, _, argIdxs = pvTP(self._musUniqueCN, self.N, self.polybasis0, self._derIdxs, self._reorder, scl = np.power(self.scaleFactor, -1.)) TN = TN[:, argIdxs] else: #if self.polydegreetype == "FULL": TN = pvP(self._musUniqueCN, [self.N] * self.npar, self.polybasis0, self._derIdxs, self._reorder, scl = np.power(self.scaleFactor, -1.)) invD = [None] * (len(idxsB)) for k in range(len(idxsB)): pseudoInv = np.diag(fitinv[k, :]) idxGlob = 0 for j, derIdxs in enumerate(self._derIdxs): nder = len(derIdxs) idxGlob += nder if nder > 1: idxLoc = np.arange(len(self.mus))[ (self._reorder >= idxGlob - nder) * (self._reorder < idxGlob)] invLoc = fitinv[k, idxLoc] pseudoInv[np.ix_(idxLoc, idxLoc)] = 0. for diffj, diffjIdx in enumerate(derIdxs): for derQ, derQIdx in enumerate(derIdxs): derUIdx = [x - y for (x, y) in zip(diffjIdx, derQIdx)] if all([x >= 0 for x in derUIdx]): derU = hashD(derUIdx) pseudoInv[idxLoc[derU], idxLoc[derQ]] = ( invLoc[diffj]) - invD[k] = pseudoInv.dot(TN) + invD[k] = dot(pseudoInv, TN) return invD, fitinv def findeveVGExplicit(self, sampleE:sampList, invD:List[Np2D]) -> Tuple[Np1D, Np2D]: """ Compute explicitly eigenvalues and eigenvectors of rational denominator matrix. """ RROMPyAssert(self._mode, message = "Cannot solve eigenvalue problem.") nEnd = invD[0].shape[1] eWidth = len(invD) vbMng(self, "INIT", "Building gramian matrix.", 10) gramian = self.HFEngine.innerProduct(sampleE, sampleE) G = np.zeros((nEnd, nEnd), dtype = np.complex) for k in range(eWidth): - G += invD[k].T.conj().dot(gramian.dot(invD[k])) + G += dot(dot(gramian, invD[k]).T, invD[k].conj()).T vbMng(self, "DEL", "Done building gramian.", 10) vbMng(self, "INIT", "Solving eigenvalue problem for gramian matrix.", 7) ev, eV = np.linalg.eigh(G) vbMng(self, "MAIN", ("Solved eigenvalue problem of size {} with condition number " "{:.4e}.").format(nEnd, ev[-1] / ev[0]), 5) vbMng(self, "DEL", "Done solving eigenvalue problem.", 7) return ev, eV def findeveVGQR(self, RPODE:Np2D, invD:List[Np2D]) -> Tuple[Np1D, Np2D]: """ Compute eigenvalues and eigenvectors of rational denominator matrix through SVD of R factor. """ RROMPyAssert(self._mode, message = "Cannot solve eigenvalue problem.") nEnd = invD[0].shape[1] S = RPODE.shape[0] eWidth = len(invD) vbMng(self, "INIT", "Building half-gramian matrix stack.", 10) Rstack = np.zeros((S * eWidth, nEnd), dtype = np.complex) for k in range(eWidth): - Rstack[k * S : (k + 1) * S, :] = RPODE.dot(invD[k]) + Rstack[k * S : (k + 1) * S, :] = dot(RPODE, invD[k]) vbMng(self, "DEL", "Done building half-gramian.", 10) vbMng(self, "INIT", "Solving svd for square root of gramian matrix.", 7) _, s, eV = np.linalg.svd(Rstack, full_matrices = False) ev = s[::-1] eV = eV[::-1, :].T.conj() vbMng(self, "MAIN", ("Solved svd problem of size {} x {} with condition number " "{:.4e}.").format(*Rstack.shape, s[0] / s[-1]), 5) vbMng(self, "DEL", "Done solving svd.", 7) return ev, eV def getResidues(self, *args, **kwargs) -> Np1D: """ Obtain approximant residues. Returns: Matrix with residues as columns. """ return self.trainedModel.getResidues(*args, **kwargs) diff --git a/rrompy/reduction_methods/standard/rational_moving_least_squares.py b/rrompy/reduction_methods/standard/rational_moving_least_squares.py index 63b24ef..7ce711d 100644 --- a/rrompy/reduction_methods/standard/rational_moving_least_squares.py +++ b/rrompy/reduction_methods/standard/rational_moving_least_squares.py @@ -1,300 +1,301 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from copy import deepcopy as copy import numpy as np from .rational_interpolant import RationalInterpolant from rrompy.utilities.poly_fitting.polynomial import (polybases as ppb, polyvander as pvP, polyvanderTotal as pvTP) from rrompy.reduction_methods.trained_model import ( TrainedModelRationalMLS as tModel) from rrompy.reduction_methods.trained_model import TrainedModelData from rrompy.utilities.base.types import Np2D, HFEng, DictAny, paramVal from rrompy.utilities.base import verbosityManager as vbMng -from rrompy.utilities.numerical import fullDegreeMaxMask, totalDegreeMaxMask +from rrompy.utilities.numerical import (fullDegreeMaxMask, totalDegreeMaxMask, + dot) from rrompy.utilities.exception_manager import (RROMPyException, RROMPyAssert, RROMPyWarning) __all__ = ['RationalMovingLeastSquares'] class RationalMovingLeastSquares(RationalInterpolant): """ ROM rational moving LS interpolant computation for parametric problems. Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': whether to compute POD of snapshots; defaults to True; - 'S': total number of samples current approximant relies upon; - 'sampler': sample point generator; - 'polybasis': type of polynomial basis for interpolation; defaults to 'MONOMIAL'; - 'M': degree of rational interpolant numerator; defaults to 0; - 'N': degree of rational interpolant denominator; defaults to 0; - 'polydegreetype': type of polynomial degree; defaults to 'TOTAL'; - 'radialBasis': numerator radial basis type; defaults to 'GAUSSIAN'; - 'radialDirectionalWeights': radial basis weights for interpolant numerator; defaults to 0, i.e. identity; - 'nNearestNeighbor': number of nearest neighbors considered in numerator if radialBasis allows; defaults to -1; - 'radialBasisDen': denominator radial basis type; defaults to 'GAUSSIAN'; - 'radialDirectionalWeightsDen': radial basis weights for interpolant denominator; defaults to 0, i.e. identity; - 'nNearestNeighborDen': number of nearest neighbors considered in denominator if radialBasisDen allows; defaults to -1; - 'interpRcond': tolerance for interpolation; defaults to None; - 'robustTol': tolerance for robust rational denominator management; defaults to 0. Defaults to empty dict. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. mu0: Default parameter. mus: Array of snapshot parameters. approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList. parameterListSoft: Recognized keys of soft approximant parameters: - 'POD': whether to compute POD of snapshots; - 'polybasis': type of polynomial basis for interpolation; - 'M': degree of rational interpolant numerator; - 'N': degree of rational interpolant denominator; - 'polydegreetype': type of polynomial degree; - 'radialBasis': numerator radial basis type; - 'radialDirectionalWeights': radial basis weights for interpolant numerator; - 'nNearestNeighbor': number of nearest neighbors considered in numerator if radialBasis allows; - 'radialBasisDen': denominator radial basis type; - 'radialDirectionalWeightsDen': radial basis weights for interpolant denominator; - 'nNearestNeighborDen': number of nearest neighbors considered in denominator if radialBasisDen allows; - 'interpRcond': tolerance for interpolation via numpy.polyfit; - 'robustTol': tolerance for robust rational denominator management. parameterListCritical: Recognized keys of critical approximant parameters: - 'S': total number of samples current approximant relies upon; - 'sampler': sample point generator. POD: Whether to compute POD of snapshots. S: Number of solution snapshots over which current approximant is based upon. sampler: Sample point generator. polybasis: type of polynomial basis for interpolation. M: Numerator degree of approximant. N: Denominator degree of approximant. polydegreetype: Type of polynomial degree. radialBasis: Numerator radial basis type. radialDirectionalWeights: Radial basis weights for interpolant numerator. nNearestNeighbor: Number of nearest neighbors considered in numerator if radialBasis allows. radialBasisDen: Denominator radial basis type. radialDirectionalWeightsDen: Radial basis weights for interpolant denominator. nNearestNeighborDen: Number of nearest neighbors considered in denominator if radialBasisDen allows. interpRcond: Tolerance for interpolation via numpy.polyfit. robustTol: Tolerance for robust rational denominator management. muBounds: list of bounds for parameter values. samplingEngine: Sampling engine. uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as sampleList. lastSolvedHF: Parameter(s) corresponding to last computed high fidelity solution(s) as parameterList. uApproxReduced: Reduced approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApproxReduced: Parameter(s) corresponding to last computed reduced approximate solution(s) as parameterList. uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApprox: Parameter(s) corresponding to last computed approximate solution(s) as parameterList. Q: Numpy 1D vector containing complex coefficients of approximant denominator. P: Numpy 2D vector whose columns are FE dofs of coefficients of approximant numerator. """ def __init__(self, HFEngine:HFEng, mu0 : paramVal = None, approxParameters : DictAny = {}, verbosity : int = 10, timestamp : bool = True): self._preInit() self._addParametersToList(["radialBasis", "radialBasisDen", "radialDirectionalWeightsDen", "nNearestNeighborDen"], ["GAUSSIAN", "GAUSSIAN", 1, -1]) super().__init__(HFEngine = HFEngine, mu0 = mu0, approxParameters = approxParameters, verbosity = verbosity, timestamp = timestamp) self.catchInstability = False self._postInit() @property def polybasis(self): """Value of polybasis.""" return self._polybasis @polybasis.setter def polybasis(self, polybasis): try: polybasis = polybasis.upper().strip().replace(" ","") if polybasis not in ppb: raise RROMPyException("Prescribed polybasis not recognized.") self._polybasis = polybasis except: RROMPyWarning(("Prescribed polybasis not recognized. Overriding " "to 'MONOMIAL'.")) self._polybasis = "MONOMIAL" self._approxParameters["polybasis"] = self.polybasis @property def radialBasis(self): """Value of radialBasis.""" return self._radialBasis @radialBasis.setter def radialBasis(self, radialBasis): self._radialBasis = radialBasis self._approxParameters["radialBasis"] = self.radialBasis @property def radialBasisDen(self): """Value of radialBasisDen.""" return self._radialBasisDen @radialBasisDen.setter def radialBasisDen(self, radialBasisDen): self._radialBasisDen = radialBasisDen self._approxParameters["radialBasisDen"] = self.radialBasisDen @property def radialDirectionalWeightsDen(self): """Value of radialDirectionalWeightsDen.""" return self._radialDirectionalWeightsDen @radialDirectionalWeightsDen.setter def radialDirectionalWeightsDen(self, radialDirectionalWeightsDen): self._radialDirectionalWeightsDen = radialDirectionalWeightsDen self._approxParameters["radialDirectionalWeightsDen"] = ( self.radialDirectionalWeightsDen) @property def nNearestNeighborDen(self): """Value of nNearestNeighborDen.""" return self._nNearestNeighborDen @nNearestNeighborDen.setter def nNearestNeighborDen(self, nNearestNeighborDen): self._nNearestNeighborDen = nNearestNeighborDen self._approxParameters["nNearestNeighborDen"] = ( self.nNearestNeighborDen) def _setupDenominator(self) -> Np2D: """Compute rational denominator.""" RROMPyAssert(self._mode, message = "Cannot setup denominator.") vbMng(self, "INIT", "Starting computation of denominator-related blocks.", 7) self._setupInterpolationIndices() if self.polydegreetype == "TOTAL": TN, _, argIdxs = pvTP(self._musUniqueCN, self.N, self.polybasis0, self._derIdxs, self._reorder, scl = np.power(self.scaleFactor, -1.)) TN = TN[:, argIdxs] else: #if self.polydegreetype == "FULL": TN = pvP(self._musUniqueCN, [self.N] * self.npar, self.polybasis0, self._derIdxs, self._reorder, scl = np.power(self.scaleFactor, -1.)) TNTen = np.zeros((self.S, self.S, TN.shape[1]), dtype = TN.dtype) TNTen[np.arange(self.S), np.arange(self.S)] = TN - if self.POD: TNTen = np.tensordot(self.samplingEngine.RPOD, TNTen, 1) + if self.POD: TNTen = dot(self.samplingEngine.RPOD, TNTen) vbMng(self, "DEL", "Done computing denominator-related blocks.", 7) return TN, TNTen def _setupNumerator(self) -> Np2D: """Compute rational numerator.""" RROMPyAssert(self._mode, message = "Cannot setup numerator.") vbMng(self, "INIT", "Starting computation of denominator-related blocks.", 7) self._setupInterpolationIndices() if self.polydegreetype == "TOTAL": TM, _, argIdxs = pvTP(self._musUniqueCN, self.M, self.polybasis0, self._derIdxs, self._reorder, scl = np.power(self.scaleFactor, -1.)) TM = TM[:, argIdxs] else: #if self.polydegreetype == "FULL": TM = pvP(self._musUniqueCN, [self.M] * self.npar, self.polybasis0, self._derIdxs, self._reorder, scl = np.power(self.scaleFactor, -1.)) vbMng(self, "DEL", "Done computing denominator-related blocks.", 7) return TM def setupApprox(self): """ Compute rational interpolant. SVD-based robust eigenvalue management. """ if self.checkComputedApprox(): return RROMPyAssert(self._mode, message = "Cannot setup approximant.") vbMng(self, "INIT", "Setting up {}.". format(self.name()), 5) self.computeSnapshots() if self.trainedModel is None: self.trainedModel = tModel() self.trainedModel.verbosity = self.verbosity self.trainedModel.timestamp = self.timestamp data = TrainedModelData(self.trainedModel.name(), self.mu0, self.samplingEngine.samples, self.scaleFactor, self.HFEngine.rescalingExp) data.POD = self.POD data.polybasis = self.polybasis data.polydegreetype = self.polydegreetype data.radialBasis = self.radialBasis data.radialWeights = self.radialDirectionalWeights data.nNearestNeighbor = self.nNearestNeighbor data.radialBasisDen = self.radialBasisDen data.radialWeightsDen = self.radialDirectionalWeightsDen data.nNearestNeighborDen = self.nNearestNeighborDen data.interpRcond = self.interpRcond self.trainedModel.data = data else: self.trainedModel = self.trainedModel self.trainedModel.data.projMat = copy(self.samplingEngine.samples) if not self.POD: self.trainedModel.data.gramian = self.HFEngine.innerProduct( self.samplingEngine.samples, self.samplingEngine.samples) self.trainedModel.data.mus = copy(self.mus) self.trainedModel.data.M = self.M self.trainedModel.data.N = self.N QVan, self.trainedModel.data.QBlocks = self._setupDenominator() self.trainedModel.data.PVan = self._setupNumerator() if self.polydegreetype == "TOTAL": degreeMaxMask = totalDegreeMaxMask else: #if self.polydegreetype == "FULL": degreeMaxMask = fullDegreeMaxMask if self.N > self.M: self.trainedModel.data.QVan = QVan self.trainedModel.data.domQIdxs = degreeMaxMask(self.N, self.npar) else: self.trainedModel.data.domQIdxs = degreeMaxMask(self.M, self.npar) self.trainedModel.data.approxParameters = copy(self.approxParameters) vbMng(self, "DEL", "Done setting up approximant.", 5) diff --git a/rrompy/reduction_methods/standard/reduced_basis.py b/rrompy/reduction_methods/standard/reduced_basis.py index 33574be..efcb121 100644 --- a/rrompy/reduction_methods/standard/reduced_basis.py +++ b/rrompy/reduction_methods/standard/reduced_basis.py @@ -1,201 +1,202 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from copy import deepcopy as copy import numpy as np from .generic_standard_approximant import GenericStandardApproximant from rrompy.reduction_methods.trained_model import \ TrainedModelReducedBasis as tModel from rrompy.reduction_methods.trained_model import TrainedModelData from rrompy.reduction_methods.base.reduced_basis_utils import \ projectAffineDecomposition from rrompy.utilities.base.types import (Np1D, Np2D, List, Tuple, DictAny, HFEng, paramVal, sampList) from rrompy.utilities.base import verbosityManager as vbMng +from rrompy.utilities.numerical import dot from rrompy.utilities.exception_manager import (RROMPyWarning, RROMPyException, RROMPyAssert) __all__ = ['ReducedBasis'] class ReducedBasis(GenericStandardApproximant): """ ROM RB approximant computation for parametric problems. Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': whether to compute POD of snapshots; defaults to True; - 'S': total number of samples current approximant relies upon; - 'sampler': sample point generator; - 'R': rank for Galerkin projection; defaults to S; - 'PODTolerance': tolerance for snapshots POD; defaults to -1. Defaults to empty dict. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. mu0: Default parameter. mus: Array of snapshot parameters. approxRadius: Dummy radius of approximant (i.e. distance from mu0 to farthest sample point). approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList. parameterListSoft: Recognized keys of soft approximant parameters: - 'POD': whether to compute POD of snapshots. - 'R': rank for Galerkin projection; - 'PODTolerance': tolerance for snapshots POD. parameterListCritical: Recognized keys of critical approximant parameters: - 'S': total number of samples current approximant relies upon; - 'sampler': sample point generator; POD: Whether to compute POD of snapshots. S: Number of solution snapshots over which current approximant is based upon. sampler: Sample point generator. R: Rank for Galerkin projection. muBounds: list of bounds for parameter values. samplingEngine: Sampling engine. uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as sampleList. lastSolvedHF: Parameter(s) corresponding to last computed high fidelity solution(s) as parameterList. uApproxReduced: Reduced approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApproxReduced: Parameter(s) corresponding to last computed reduced approximate solution(s) as parameterList. uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApprox: Parameter(s) corresponding to last computed approximate solution(s) as parameterList. As: List of sparse matrices (in CSC format) representing coefficients of linear system matrix. bs: List of numpy vectors representing coefficients of linear system RHS. ARBs: List of sparse matrices (in CSC format) representing coefficients of compressed linear system matrix. bRBs: List of numpy vectors representing coefficients of compressed linear system RHS. """ def __init__(self, HFEngine:HFEng, mu0 : paramVal = None, approxParameters : DictAny = {}, verbosity : int = 10, timestamp : bool = True): self._preInit() self._addParametersToList(["R", "PODTolerance"], ["AUTO", -1]) super().__init__(HFEngine = HFEngine, mu0 = mu0, approxParameters = approxParameters, verbosity = verbosity, timestamp = timestamp) self._postInit() @property def R(self): """Value of R. Its assignment may change S.""" return self._R @R.setter def R(self, R): if R == "AUTO": if not hasattr(self, "_S"): raise RROMPyException(("Cannot assign R automatically without " "S.")) R = self.S if R < 0: raise RROMPyException("R must be non-negative.") self._R = R self._approxParameters["R"] = self.R @property def PODTolerance(self): """Value of PODTolerance.""" return self._PODTolerance @PODTolerance.setter def PODTolerance(self, PODTolerance): self._PODTolerance = PODTolerance self._approxParameters["PODTolerance"] = self.PODTolerance def _setupProjectionMatrix(self): """Compute projection matrix.""" RROMPyAssert(self._mode, message = "Cannot setup numerator.") vbMng(self, "INIT", "Starting computation of projection matrix.", 7) nsamples = self.samplingEngine.nsamples if self.R > nsamples: RROMPyWarning(("R too large compared to S. Reducing R by " "{}").format(self.R - nsamples)) self.R = nsamples if self.POD: U, s, _ = np.linalg.svd(self.samplingEngine.RPOD) s = s ** 2. else: Gramian = self.HFEngine.innerProduct(self.samplingEngine.samples, self.samplingEngine.samples) U, s, _ = np.linalg.svd(Gramian) snorm = np.cumsum(s[::-1]) / np.sum(s) nPODTrunc = min(nsamples - np.argmax(snorm > self.PODTolerance), self.R) - pMat = self.samplingEngine.samples.dot(U[:, : nPODTrunc]) + pMat = dot(self.samplingEngine.samples, U[:, : nPODTrunc]) vbMng(self, "MAIN", ("Assembling {}x{} projection matrix from {} " "samples.").format(*(pMat.shape), nsamples), 5) vbMng(self, "DEL", "Done computing projection matrix.", 7) return pMat def setupApprox(self): """Compute RB projection matrix.""" if self.checkComputedApprox(): return RROMPyAssert(self._mode, message = "Cannot setup approximant.") vbMng(self, "INIT", "Setting up {}.". format(self.name()), 5) self.computeSnapshots() pMat = self._setupProjectionMatrix() if self.trainedModel is None: self.trainedModel = tModel() self.trainedModel.verbosity = self.verbosity self.trainedModel.timestamp = self.timestamp data = TrainedModelData(self.trainedModel.name(), self.mu0, pMat, self.scaleFactor, self.HFEngine.rescalingExp) data.affinePoly = self.HFEngine.affinePoly data.mus = copy(self.mus) data.thAs, data.thbs = self.HFEngine.thAs, self.HFEngine.thbs self.trainedModel.data = data else: self.trainedModel = self.trainedModel self.trainedModel.data.projMat = copy(pMat) ARBs, bRBs = self.assembleReducedSystem(pMat) self.trainedModel.data.ARBs = ARBs self.trainedModel.data.bRBs = bRBs self.trainedModel.data.approxParameters = copy(self.approxParameters) vbMng(self, "DEL", "Done setting up approximant.", 5) def assembleReducedSystem(self, pMat : sampList = None, pMatOld : sampList = None)\ -> Tuple[List[Np2D], List[Np1D]]: """Build affine blocks of RB linear system through projections.""" if pMat is None: self.setupApprox() ARBs = self.trainedModel.data.ARBs bRBs = self.trainedModel.data.bRBs else: vbMng(self, "INIT", "Projecting affine terms of HF model.", 10) ARBsOld = None if pMatOld is None else self.trainedModel.data.ARBs bRBsOld = None if pMatOld is None else self.trainedModel.data.bRBs ARBs, bRBs = projectAffineDecomposition(self.HFEngine.As, self.HFEngine.bs, pMat, ARBsOld, bRBsOld, pMatOld) vbMng(self, "DEL", "Done projecting affine terms.", 10) return ARBs, bRBs diff --git a/rrompy/reduction_methods/trained_model/trained_model_pivoted_general.py b/rrompy/reduction_methods/trained_model/trained_model_pivoted_general.py index 96b219a..9cddbce 100644 --- a/rrompy/reduction_methods/trained_model/trained_model_pivoted_general.py +++ b/rrompy/reduction_methods/trained_model/trained_model_pivoted_general.py @@ -1,375 +1,375 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import numpy as np from scipy.special import factorial as fact from itertools import combinations from .trained_model import TrainedModel from rrompy.utilities.base.types import (Np1D, Tuple, List, ListAny, paramVal, paramList, sampList, HFEng) from rrompy.utilities.base import verbosityManager as vbMng, freepar as fp from rrompy.utilities.numerical import pointMatching from rrompy.utilities.poly_fitting.heaviside import HeavisideInterpolator as HI from rrompy.utilities.exception_manager import RROMPyException, RROMPyAssert from rrompy.parameter import checkParameter, checkParameterList from rrompy.sampling import emptySampleList, sampleList __all__ = ['TrainedModelPivotedGeneral'] class TrainedModelPivotedGeneral(TrainedModel): """ ROM approximant evaluation for pivoted approximants (with pole matching). Attributes: Data: dictionary with all that can be pickled. """ def centerNormalizePivot(self, mu : paramList = [], mu0 : paramVal = None) -> paramList: """ Compute normalized parameter to be plugged into approximant. Args: mu: Parameter(s) 1. mu0: Parameter(s) 2. If None, set to self.data.mu0Pivot. Returns: Normalized parameter. """ mu = checkParameterList(mu, self.data.nparPivot)[0] if mu0 is None: mu0 = self.data.mu0Pivot rad = ((mu ** self.data.rescalingExpPivot - mu0 ** self.data.rescalingExpPivot) / self.data.scaleFactorPivot) return rad def centerNormalizeMarginal(self, mu : paramList = [], mu0 : paramVal = None) -> paramList: """ Compute normalized parameter to be plugged into approximant. Args: mu: Parameter(s) 1. mu0: Parameter(s) 2. If None, set to self.data.mu0Marginal. Returns: Normalized parameter. """ mu = checkParameterList(mu, self.data.nparMarginal)[0] if mu0 is None: mu0 = self.data.mu0Marginal rad = ((mu ** self.data.rescalingExpMarginal - mu0 ** self.data.rescalingExpMarginal) / self.data.scaleFactorMarginal) return rad def initializeFromLists(self, poles:ListAny, coeffs:ListAny, basis:str, HFEngine:HFEng, matchingWeight : float = 1., POD : bool = True): """Initialize Heaviside representation.""" musM = self.data.musMarginal margAbsDist = np.sum(np.abs(np.repeat(musM.data, len(musM), 0) - np.tile(musM.data, [len(musM), 1]) ), axis = 1).reshape(len(musM), len(musM)) N = len(poles[0]) explored = [0] unexplored = list(range(1, len(musM))) for _ in range(1, len(musM)): minIdx = np.argmin(np.concatenate([margAbsDist[ex, unexplored] \ for ex in explored])) minIex = explored[minIdx // len(unexplored)] minIunex = unexplored[minIdx % len(unexplored)] dist = np.abs(np.tile(poles[minIex].reshape(-1, 1), N) - poles[minIunex].reshape(1, -1)) if matchingWeight != 0: resex = coeffs[minIex][: N] resunex = coeffs[minIunex][: N] if POD: distR = resex.dot(resunex.T.conj()) distR = (distR.T / np.linalg.norm(resex, axis = 1)).T distR = distR / np.linalg.norm(resunex, axis = 1) else: resex = self.data.projMat.dot(resex.T) resunex = self.data.projMat.dot(resunex.T) distR = HFEngine.innerProduct(resex, resunex).T distR = (distR.T / HFEngine.norm(resex)).T distR = distR / HFEngine.norm(resunex) distR = np.abs(distR) distR[distR > 1.] = 1. dist += 2. / np.pi * matchingWeight * np.arccos(distR) reordering = pointMatching(dist) poles[minIunex] = poles[minIunex][reordering] coeffs[minIunex][: N] = coeffs[minIunex][reordering] explored += [minIunex] unexplored.remove(minIunex) HIs = [] for pls, cfs in zip(poles, coeffs): hsi = HI() hsi.poles = pls hsi.coeffs = cfs hsi.npar = 1 hsi.polybasis = basis HIs += [hsi] self.data.HIs = HIs def recompressByCutOff(self, murange : Tuple[float, float] = [- 1., 1.], tol : float = np.inf, rtype : str = "MAGNITUDE"): if np.isinf(tol): return " No poles erased." N = len(self.data.HIs[0].poles) mu0 = np.mean(murange) musig = murange[0] - mu0 if np.isclose(musig, 0.): radius = lambda x: np.abs(x - mu0) else: if rtype == "MAGNITUDE": murdir = (murange[0] - mu0) / np.abs(musig) def radius(x): scalprod = (x - mu0) * murdir.conj() / np.abs(musig) rescalepar = np.abs(np.real(scalprod)) rescaleort = np.abs(np.imag(scalprod)) return ((rescalepar - 1.) ** 2. * (rescalepar > 1.) + rescaleort ** 2.) ** .5 else:#if rtype == "POTENTIAL": def radius(x): rescale = (x - mu0) / musig return np.max(np.abs(rescale * np.array([-1., 1.]) + (rescale ** 2. - 1) ** .5)) - 1. keepPole, removePole = [], [] for j in range(N): for hi in self.data.HIs: if radius(hi.poles[j]) <= tol: keepPole += [j] break if len(keepPole) == 0 or keepPole[-1] != j: removePole += [j] if len(keepPole) == N: return " No poles erased." keepCoeff = keepPole + [N] keepCoeff = keepCoeff + list(range(N + 1,len(self.data.HIs[0].coeffs))) for hi in self.data.HIs: polyCorrection = np.zeros_like(hi.coeffs[0, :]) for j in removePole: polyCorrection += hi.coeffs[j, :] / (mu0 - hi.poles[j]) if len(hi.coeffs) == N: hi.coeffs = np.vstack((hi.coeffs, polyCorrection)) else: hi.coeffs[N, :] += polyCorrection hi.poles = hi.poles[keepPole] hi.coeffs = hi.coeffs[keepCoeff, :] return (" Erased {} pole".format(len(removePole)) + "s" * (len(removePole) > 1) + ".") def interpolateMarginal(self, mu : paramList = [], samples : ListAny = [], der : List[int] = None, scl : Np1D = None) -> sampList: """ Evaluate marginal interpolator at arbitrary marginal parameter. Args: mu: Target parameter. samples: Objects to interpolate. der(optional): Derivatives to take before evaluation. """ mu = checkParameterList(mu, self.data.nparMarginal)[0] sList = isinstance(samples[0], sampleList) sEff = [None] * len(samples) for j in range(len(samples)): if sList: sEff[j] = samples[j].data else: sEff[j] = samples[j] try: dtype = sEff[0].dtype except: dtype = sEff[0][0].dtype vbMng(self, "INIT", "Interpolating marginal at mu = {}.".format(mu), 95) muC = self.centerNormalizeMarginal(mu) p = emptySampleList() p.reset((len(sEff[0]), len(muC)), dtype = dtype) p.data[:] = 0. if len(sEff[0]) > 0: for mIj, spj in zip(self.data.marginalInterp, sEff): p = p + spj.reshape(len(sEff[0]), - 1) * mIj(muC, der, scl) vbMng(self, "DEL", "Done interpolating marginal.", 95) if not sList: p = p.data.flatten() return p def interpolateMarginalInterpolator(self, mu : paramVal = []) -> Np1D: """Obtain interpolated approximant interpolator.""" mu = checkParameter(mu, self.data.nparMarginal)[0] hsi = HI() hsi.poles = self.interpolateMarginalPoles(mu) hsi.coeffs = self.interpolateMarginalCoeffs(mu) hsi.npar = 1 hsi.polybasis = self.data.HIs[0].polybasis return hsi def interpolateMarginalPoles(self, mu : paramList = []) -> Np1D: """Obtain interpolated approximant poles.""" mu = checkParameterList(mu, self.data.nparMarginal)[0] return self.interpolateMarginal(mu, [hi.poles for hi in self.data.HIs]) def interpolateMarginalCoeffs(self, mu : paramList = []) -> Np1D: """Obtain interpolated approximant coefficients.""" mu = checkParameterList(mu, self.data.nparMarginal)[0] cs = self.interpolateMarginal(mu, [hi.coeffs for hi in self.data.HIs]) if isinstance(cs, (list, tuple,)): cs = np.array(cs) return cs.reshape(self.data.HIs[0].coeffs.shape) def getApproxReduced(self, mu : paramList = []) -> sampList: """ Evaluate reduced representation of approximant at arbitrary parameter. Args: mu: Target parameter. """ RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters") mu = checkParameterList(mu, self.data.npar)[0] if (not hasattr(self, "lastSolvedApproxReduced") or self.lastSolvedApproxReduced != mu): vbMng(self, "INIT", "Evaluating approximant at mu = {}.".format(mu), 12) self.uApproxReduced = emptySampleList() for i, muPL in enumerate(mu): muL = self.centerNormalizePivot([muPL(0, x) \ for x in self.data.directionPivot]) muM = [muPL(0, x) for x in self.data.directionMarginal] vbMng(self, "INIT", "Assembling reduced model for mu = {}.".format(muPL), 87) hsL = self.interpolateMarginalInterpolator(muM) vbMng(self, "DEL", "Done assembling reduced model.", 87) uAppR = hsL(muL) if i == 0: #self.data.HIs[0].coeffs.shape[1], len(mu) self.uApproxReduced.reset((len(uAppR), len(mu)), dtype = uAppR.dtype) self.uApproxReduced[i] = uAppR vbMng(self, "DEL", "Done evaluating approximant.", 12) self.lastSolvedApproxReduced = mu return self.uApproxReduced def getPVal(self, mu : paramList = []) -> sampList: """ Evaluate rational numerator at arbitrary parameter. Args: mu: Target parameter. """ RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters") mu = checkParameterList(mu, self.data.npar)[0] p = emptySampleList() p.reset((len(self.data.HIs[0].coeffs.shape[1]), len(mu))) for i, muPL in enumerate(mu): muL = self.centerNormalizePivot([muPL(0, x) \ for x in self.data.directionPivot]) muM = [muPL(0, x) for x in self.data.directionMarginal] hsL = self.interpolateMarginalInterpolator(muM) p[i] = hsL(muL) * np.prod(muL(0, 0) - hsL.poles) return p def getQVal(self, mu:Np1D, der : List[int] = None, scl : Np1D = None) -> Np1D: """ Evaluate rational denominator at arbitrary parameter. Args: mu: Target parameter. der(optional): Derivatives to take before evaluation. """ RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters") mu = checkParameterList(mu, self.data.npar)[0] muP = self.centerNormalizePivot(checkParameterList( mu.data[:, self.data.directionPivot], self.data.nparPivot)[0]) muM = checkParameterList(mu.data[:, self.data.directionMarginal], self.data.nparMarginal)[0] if der is None: derP, derM = 0, [0] else: derP = der[self.data.directionPivot[0]] derM = [der[x] for x in self.data.directionMarginal] if np.any(np.array(derM) != 0): raise RROMPyException(("Derivatives of Q with respect to marginal " "parameters not allowed.")) sclP = 1 if scl is None else scl[self.data.directionPivot[0]] derVal = np.zeros(len(mu), dtype = np.complex) N = len(self.data.HIs[0].poles) if derP == N: derVal[:] = 1. elif derP >= 0 and derP < N: pls = self.interpolateMarginalPoles(muM).reshape(-1, len(mu)).T plsDist = muP.data.reshape(-1, 1) - pls for terms in combinations(np.arange(N), N - derP): derVal += np.prod(plsDist[:, list(terms)], axis = 1) return sclP ** derP * fact(derP) * derVal def getPoles(self, *args, **kwargs) -> Np1D: """ Obtain approximant poles. Returns: Numpy complex vector of poles. """ RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters") if len(args) + len(kwargs) > 1: raise RROMPyException(("Wrong number of parameters passed. " "Only 1 available.")) elif len(args) + len(kwargs) == 1: if len(args) == 1: mVals = args[0] else: mVals = kwargs["marginalVals"] if not hasattr(mVals, "__len__"): mVals = [mVals] mVals = list(mVals) else: mVals = [fp] try: rDim = mVals.index(fp) if rDim < len(mVals) - 1 and fp in mVals[rDim + 1 :]: raise except: raise RROMPyException(("Exactly 1 'freepar' entry in " "marginalVals must be provided.")) if rDim != self.data.directionPivot[0]: raise RROMPyException(("'freepar' entry in marginalVals must " "coincide with pivot direction.")) mVals[rDim] = self.data.mu0(rDim) mMarg = [mVals[j] for j in range(len(mVals)) if j != rDim] - roots = np.array(self.interpolateMarginalPoles(mMarg)) + roots = np.sort(np.array(self.interpolateMarginalPoles(mMarg))) return np.power(self.data.mu0(rDim) ** self.data.rescalingExp[rDim] + self.data.scaleFactor[rDim] * roots, 1. / self.data.rescalingExp[rDim]) def getResidues(self, *args, **kwargs) -> Np1D: """ Obtain approximant residues. Returns: Numpy matrix with residues as columns. """ pls = self.getPoles(*args, **kwargs) if len(args) == 1: mVals = args[0] else: mVals = kwargs["marginalVals"] if not hasattr(mVals, "__len__"): mVals = [mVals] mVals = list(mVals) rDim = mVals.index(fp) mMarg = [mVals[j] for j in range(len(mVals)) if j != rDim] residues = self.interpolateMarginalCoeffs(mMarg)[: len(pls)] res = self.data.projMat.dot(residues.T) return pls, res diff --git a/rrompy/reduction_methods/trained_model/trained_model_rational.py b/rrompy/reduction_methods/trained_model/trained_model_rational.py index 679482e..c7dcebc 100644 --- a/rrompy/reduction_methods/trained_model/trained_model_rational.py +++ b/rrompy/reduction_methods/trained_model/trained_model_rational.py @@ -1,163 +1,164 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import numpy as np from .trained_model import TrainedModel from rrompy.utilities.base.types import (Np1D, List, paramVal, paramList, sampList) from rrompy.utilities.base import verbosityManager as vbMng, freepar as fp from rrompy.utilities.exception_manager import RROMPyException from rrompy.parameter import (checkParameter, checkParameterList, emptyParameterList) from rrompy.sampling import sampleList __all__ = ['TrainedModelRational'] class TrainedModelRational(TrainedModel): """ ROM approximant evaluation for rational approximant. Attributes: Data: dictionary with all that can be pickled. """ def centerNormalize(self, mu : paramList = [], mu0 : paramVal = None) -> paramList: """ Compute normalized parameter to be plugged into approximant. Args: mu: Parameter(s) 1. mu0: Parameter(s) 2. If None, set to self.data.mu0. Returns: Normalized parameter. """ mu = checkParameterList(mu, self.data.npar)[0] if mu0 is None: mu0 = self.data.mu0 rad = ((mu ** self.data.rescalingExp - mu0 ** self.data.rescalingExp) / self.data.scaleFactor) return rad def getPVal(self, mu : paramList = []) -> sampList: """ Evaluate rational numerator at arbitrary parameter. Args: mu: Target parameter. """ mu = checkParameterList(mu, self.data.npar)[0] vbMng(self, "INIT", "Evaluating numerator at mu = {}.".format(mu), 17) muCenter = self.centerNormalize(mu) p = sampleList(self.data.P(muCenter)) vbMng(self, "DEL", "Done evaluating numerator.", 17) return p def getQVal(self, mu:Np1D, der : List[int] = None, scl : Np1D = None) -> Np1D: """ Evaluate rational denominator at arbitrary parameter. Args: mu: Target parameter. der(optional): Derivatives to take before evaluation. """ mu = checkParameterList(mu, self.data.npar)[0] vbMng(self, "INIT", "Evaluating denominator at mu = {}.".format(mu), 17) muCenter = self.centerNormalize(mu) q = self.data.Q(muCenter, der, scl) vbMng(self, "DEL", "Done evaluating denominator.", 17) return q def getApproxReduced(self, mu : paramList = []) -> sampList: """ Evaluate reduced representation of approximant at arbitrary parameter. Args: mu: Target parameter. """ mu = checkParameterList(mu, self.data.npar)[0] if (not hasattr(self, "lastSolvedApproxReduced") or self.lastSolvedApproxReduced != mu): vbMng(self, "INIT", "Evaluating approximant at mu = {}.".format(mu), 12) self.uApproxReduced = self.getPVal(mu) / self.getQVal(mu) vbMng(self, "DEL", "Done evaluating approximant.", 12) self.lastSolvedApproxReduced = mu return self.uApproxReduced def getPoles(self, *args, **kwargs) -> Np1D: """ Obtain approximant poles. Returns: Numpy complex vector of poles. """ if len(args) + len(kwargs) > 1: raise RROMPyException(("Wrong number of parameters passed. " "Only 1 available.")) elif len(args) + len(kwargs) == 1: if len(args) == 1: mVals = args[0] else: mVals = kwargs["marginalVals"] if not hasattr(mVals, "__len__"): mVals = [mVals] mVals = list(mVals) else: mVals = [fp] try: rDim = mVals.index(fp) if rDim < len(mVals) - 1 and fp in mVals[rDim + 1 :]: raise except: raise RROMPyException(("Exactly 1 'freepar' entry in " "marginalVals must be provided.")) mVals[rDim] = self.data.mu0(rDim) mVals = self.centerNormalize(checkParameter(mVals, len(mVals))) mVals = list(mVals.data.flatten()) mVals[rDim] = fp return np.power(self.data.mu0(rDim) ** self.data.rescalingExp[rDim] - + self.data.scaleFactor[rDim] * self.data.Q.roots(mVals), + + self.data.scaleFactor[rDim] + * np.sort(self.data.Q.roots(mVals)), 1. / self.data.rescalingExp[rDim]) def getResidues(self, *args, **kwargs) -> Np1D: """ Obtain approximant residues. Returns: Numpy matrix with residues as columns. """ pls = self.getPoles(*args, **kwargs) if len(args) == 1: mVals = args[0] else: mVals = kwargs["marginalVals"] if not hasattr(mVals, "__len__"): mVals = [mVals] mVals = list(mVals) rDim = mVals.index(fp) poles = emptyParameterList() poles.reset((len(pls), self.data.npar), dtype = pls.dtype) for k, pl in enumerate(pls): poles[k] = mVals poles.data[k, rDim] = pl res = (self.data.projMat.dot(self.getPVal(poles).data) / self.getQVal(poles, list(1 * (np.arange(self.data.npar) == rDim)))) return pls, res diff --git a/rrompy/reduction_methods/trained_model/trained_model_reduced_basis.py b/rrompy/reduction_methods/trained_model/trained_model_reduced_basis.py index f3cf511..33e8364 100644 --- a/rrompy/reduction_methods/trained_model/trained_model_reduced_basis.py +++ b/rrompy/reduction_methods/trained_model/trained_model_reduced_basis.py @@ -1,115 +1,115 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import numpy as np from .trained_model import TrainedModel from rrompy.utilities.base.types import (Np1D, ListAny, paramVal, paramList, sampList) from rrompy.utilities.base import verbosityManager as vbMng, freepar as fp from rrompy.utilities.numerical import (eigvalsNonlinearDense, marginalizePolyList) from rrompy.utilities.expression import expressionEvaluator from rrompy.utilities.exception_manager import RROMPyException, RROMPyWarning from rrompy.parameter import checkParameter, checkParameterList from rrompy.sampling import emptySampleList __all__ = ['TrainedModelReducedBasis'] class TrainedModelReducedBasis(TrainedModel): """ ROM approximant evaluation for RB approximant. Attributes: Data: dictionary with all that can be pickled. """ def assembleReducedModel(self, mu:paramVal): mu = checkParameter(mu, self.data.npar) if not hasattr(self, "lastSetupMu") or self.lastSetupMu != mu: vbMng(self, "INIT", "Assembling reduced model for mu = {}."\ .format(mu), 17) muEff = mu ** self.data.rescalingExp self.data.ARBmu, self.data.bRBmu = 0., 0. for thA, ARB in zip(self.data.thAs, self.data.ARBs): self.data.ARBmu = (expressionEvaluator(thA[0], muEff) * ARB + self.data.ARBmu) for thb, bRB in zip(self.data.thbs, self.data.bRBs): self.data.bRBmu = (expressionEvaluator(thb[0], muEff) * bRB + self.data.bRBmu) vbMng(self, "DEL", "Done assembling reduced model.", 17) self.lastSetupMu = mu def getApproxReduced(self, mu : paramList = []) -> sampList: """ Evaluate reduced representation of approximant at arbitrary parameter. Args: mu: Target parameter. """ mu = checkParameterList(mu, self.data.npar)[0] if (not hasattr(self, "lastSolvedApproxReduced") or self.lastSolvedApproxReduced != mu): vbMng(self, "INIT", "Computing RB solution at mu = {}.".format(mu), 12) self.uApproxReduced = emptySampleList() for i in range(len(mu)): self.assembleReducedModel(mu[i]) vbMng(self, "INIT", "Solving reduced model for mu = {}.".format(mu[i]), 15) uAppR = np.linalg.solve(self.data.ARBmu, self.data.bRBmu) if i == 0: #self.data.ARBs[0].shape[-1], len(mu) self.uApproxReduced.reset((len(uAppR), len(mu)), dtype = uAppR.dtype) self.uApproxReduced[i] = uAppR vbMng(self, "DEL", "Done solving reduced model.", 15) vbMng(self, "DEL", "Done computing RB solution.", 12) self.lastSolvedApproxReduced = mu return self.uApproxReduced def getPoles(self, marginalVals : ListAny = [fp], jSupp : int = 1, **kwargs) -> Np1D: """ Obtain approximant poles. Returns: Numpy complex vector of poles. """ if not self.data.affinePoly: RROMPyWarning(("Unable to compute approximate poles due " "to parametric dependence (detected non-" "polynomial). Change HFEngine.affinePoly to True " "if necessary.")) return if not hasattr(marginalVals, "__len__"): marginalVals = [marginalVals] mVals = list(marginalVals) try: rDim = mVals.index(fp) if rDim < len(mVals) - 1 and fp in mVals[rDim + 1 :]: raise except: raise RROMPyException(("Exactly 1 'freepar' entry in " "marginalVals must be provided.")) ARBs = self.data.ARBs if self.data.npar > 1: mVals[rDim] = self.data.mu0(rDim) mVals = checkParameter(mVals).data.flatten() mVals[rDim] = fp ARBs = marginalizePolyList(ARBs, mVals, "auto") ev = eigvalsNonlinearDense(ARBs, jSupp = jSupp, **kwargs) - return np.power(ev, 1. / self.data.rescalingExp[rDim]) + return np.sort(np.power(ev, 1. / self.data.rescalingExp[rDim])) diff --git a/rrompy/sampling/base/pod_engine.py b/rrompy/sampling/base/pod_engine.py index 4e3b9b7..b3298a4 100644 --- a/rrompy/sampling/base/pod_engine.py +++ b/rrompy/sampling/base/pod_engine.py @@ -1,133 +1,134 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import numpy as np from copy import deepcopy as copy from rrompy.utilities.base.types import Np1D, Np2D, Tuple, HFEng, sampList +from rrompy.utilities.numerical import dot from rrompy.sampling import sampleList __all__ = ['PODEngine'] class PODEngine: """ POD engine for general matrix orthogonalization. """ def __init__(self, HFEngine:HFEng): self.HFEngine = HFEngine def name(self) -> str: return self.__class__.__name__ def __str__(self) -> str: return self.name() def __repr__(self) -> str: return self.__str__() + " at " + hex(id(self)) def GS(self, a:Np1D, Q:sampList, n : int = -1) -> Tuple[Np1D, Np1D, bool]: """ Compute 1 Gram-Schmidt step with given projector. Args: a: vector to be projected; Q: orthogonal projection matrix; n: number of columns of Q to be considered; Returns: Resulting normalized vector, coefficients of a wrt the updated basis, whether computation is ill-conditioned. """ if n == -1: n = Q.shape[1] r = np.zeros((n + 1,), dtype = Q.dtype) if n > 0: Q = Q[: n] for j in range(2): # twice is enough! nu = self.HFEngine.innerProduct(a, Q) - a = a - Q.dot(nu) + a = a - dot(Q, nu) r[:-1] = r[:-1] + nu.flatten() r[-1] = self.HFEngine.norm(a) ill_cond = False if np.isclose(np.abs(r[-1]) / np.linalg.norm(r), 0.): ill_cond = True r[-1] = 1. a = a / r[-1] return a, r, ill_cond def generalizedQR(self, A:sampList, Q0 : sampList = None, only_R : bool = False, genTrials : int = 10) -> Tuple[sampList, Np2D]: """ Compute generalized QR decomposition of a matrix through Householder method. Args: A: matrix to be decomposed; Q0(optional): initial orthogonal guess for Q; defaults to random; only_R(optional): whether to skip reconstruction of Q; defaults to False. genTrials(optional): number of trials of generation of linearly independent vector; defaults to 10. Returns: Resulting (orthogonal and )upper-triangular factor(s). """ Nh, N = A.shape B = copy(A) V = copy(A) R = np.zeros((N, N), dtype = A.dtype) if Q0 is None: Q = sampleList(np.zeros(A.shape, dtype = A.dtype) + np.random.randn(*(A.shape))) else: Q = copy(Q0) for k in range(N): a = B[k] R[k, k] = self.HFEngine.norm(a) if Q0 is None: for _ in range(genTrials): Q[k], _, illC = self.GS(np.random.randn(Nh), Q, k) if not illC: break else: illC = False if illC: Q[k] = np.zeros(Nh, dtype = Q.dtype) alpha = 0. else: alpha = self.HFEngine.innerProduct(a, Q[k]) if np.isclose(np.abs(alpha), 0.): s = 1. else: s = - alpha / np.abs(alpha) Q[k] = s * Q[k] V[k], _, _ = self.GS(R[k, k] * Q[k] - a, Q, k) J = np.arange(k + 1, N) vtB = self.HFEngine.innerProduct(B[J], V[k]) B.data[:, J] -= 2 * np.outer(V[k], vtB) if illC: R[k, J] = 0. else: R[k, J] = self.HFEngine.innerProduct(B[J], Q[k]) B.data[:, J] -= np.outer(Q[k], R[k, J]) if only_R: return R for k in range(N - 1, -1, -1): J = list(range(k, N)) vtQ = self.HFEngine.innerProduct(Q[J], V[k]) Q.data[:, J] -= 2 * np.outer(V[k], vtQ) return Q, R diff --git a/rrompy/sampling/sample_list.py b/rrompy/sampling/sample_list.py index f050c88..75ac132 100644 --- a/rrompy/sampling/sample_list.py +++ b/rrompy/sampling/sample_list.py @@ -1,222 +1,226 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from copy import deepcopy as copy import numpy as np from rrompy.utilities.exception_manager import RROMPyAssert from rrompy.utilities.base.types import Np1D, List +from rrompy.utilities.numerical import dot __all__ = ['emptySampleList', 'sampleList'] def emptySampleList(): return sampleList(np.empty((0, 0))) class sampleList: """HERE""" def __init__(self, data:List[Np1D], lengthCheck : int = None, deep : bool = True): if isinstance(data, (self.__class__,)): data = data.data if isinstance(data, (np.ndarray,)): self.data = copy(data) if deep else data if self.data.ndim <= 1: self.data.shape = (self.data.shape[0], 1) else: if not isinstance(data, (list,)): data = [data] self.data = np.empty((len(data[0]), len(data)), dtype = data[0].dtype) for j, par in enumerate(data): self[j] = copy(data[j]) if deep else data[j] if j == 0 and lengthCheck is None: lengthCheck = self.shape[0] RROMPyAssert(len(data[j]), lengthCheck, "Number of parameters") def __len__(self): return self.shape[1] def __str__(self): return str(self.data) def __repr__(self): return repr(self.data) @property def shape(self): return self.data.shape @property def re(self): return sampleList(np.real(self.data)) @property def im(self): return sampleList(np.imag(self.data)) @property def abs(self): return sampleList(np.abs(self.data)) @property def angle(self): return sampleList(np.angle(self.data)) def conj(self): return sampleList(np.conj(self.data)) @property def T(self): return sampleList(self.data.T) @property def H(self): return sampleList(self.data.T.conj()) @property def dtype(self): return self.data.dtype @dtype.setter def dtype(self, dtype): self.data.dtype = dtype def __getitem__(self, key): return self.data[:, key] def __call__(self, key): return sampleList(self.data[:, key]) def __setitem__(self, key, value): if isinstance(value, self.__class__): value = value.data if isinstance(key, (tuple, list,)): RROMPyAssert(len(key), len(value), "Slice length") for k, val in zip(key, value): self[k] = val else: self.data[:, key] = value.flatten() def __iter__(self): return self.data.T.__iter__() def __eq__(self, other): if not hasattr(other, "shape") or self.shape != other.shape: return False if isinstance(other, self.__class__): fac = other.data else: fac = other return np.allclose(self.data, fac) def __ne__(self, other): return not self == other def __copy__(self): return sampleList(self.data) def __deepcopy__(self, memo): return sampleList(copy(self.data, memo)) def __add__(self, other): if isinstance(other, self.__class__): RROMPyAssert(self.shape, other.shape, "Sample shape") fac = other.data else: fac = other return sampleList(self.data + fac) def __iadd__(self, other): self.data = (self + other).data return self def __sub__(self, other): if isinstance(other, self.__class__): RROMPyAssert(self.shape, other.shape, "Sample shape") fac = other.data else: fac = other return sampleList(self.data - fac) def __isub__(self, other): self.data = (self - other).data return self def __mul__(self, other): if isinstance(other, self.__class__): RROMPyAssert(self.shape, other.shape, "Sample shape") fac = other.data else: fac = other return sampleList(self.data * fac) def __imul__(self, other): self.data = (self * other).data return self def __truediv__(self, other): if isinstance(other, self.__class__): RROMPyAssert(self.shape, other.shape, "Sample shape") fac = other.data else: fac = other return sampleList(self.data / fac) def __idiv__(self, other): self.data = (self / other).data return self def __pow__(self, other): if isinstance(other, self.__class__): RROMPyAssert(self.shape, other.shape, "Sample shape") fac = other.data else: fac = other return sampleList(np.power(self.data, fac)) def __ipow__(self, other): self.data = (self ** other).data return self def __neg__(self): return sampleList(- self.data) def __pos__(self): return sampleList(self.data) def reset(self, size, dtype = np.complex): self.data = np.empty(size, dtype = dtype) self.data[:] = np.nan def append(self, items): if isinstance(items, self.__class__): fac = items.data else: fac = np.array(items, ndmin = 2) self.data = np.append(self.data, fac, axis = 1) def pop(self, idx = -1): self.data = np.delete(self.data, idx, axis = 1) def dot(self, other, sampleListOut : bool = True): if isinstance(other, self.__class__): other = other.data - prod = self.data.dot(other) + try: + prod = dot(self.data, other) + except: + prod = dot(other.T, self.data.T).T if sampleListOut: prod = sampleList(prod) return prod diff --git a/rrompy/solver/norm_utilities.py b/rrompy/solver/norm_utilities.py index fa40919..244ec67 100644 --- a/rrompy/solver/norm_utilities.py +++ b/rrompy/solver/norm_utilities.py @@ -1,99 +1,100 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from abc import abstractmethod import numpy as np from copy import deepcopy as copy from rrompy.utilities.base.types import Np1D, Np2D, DictAny +from rrompy.utilities.numerical import dot as tdot, solve as tsolve from rrompy.solver.linear_solver import setupSolver from rrompy.utilities.exception_manager import RROMPyException __all__ = ['Np2DLike', 'Np2DLikeEye', 'Np2DLikeInv', 'Np2DLikeInvLowRank', 'normEngine'] @abstractmethod class Np2DLike: def dot(self, u:Np2D) -> Np2D: pass class Np2DLikeEye(Np2DLike): def __init__(self, n:int): self.n = n @property def T(self): return self @property def shape(self): return (self.n, self.n) def conj(self): return self def dot(self, u:Np2D) -> Np2D: return u class Np2DLikeInv(Np2DLike): def __init__(self, K:Np2D, M:Np2D, solverType:str, solverArgs:DictAny): self.K, self.M, self.MH = K, M, M.T.conj() try: self.solver, self.solverArgs = setupSolver(solverType, solverArgs) except: self.solver, self.solverArgs = solverType, solverArgs def dot(self, u:Np2D) -> Np2D: - return self.MH.dot(self.solver(self.K, self.M.dot(u), - self.solverArgs)).reshape(u.shape) + return tdot(self.MH, tsolve(self.K, tdot(self.M, u), self.solver, + self.solverArgs)).reshape(u.shape) @property def shape(self): return (self.MH.shape[0], self.M.shape[1]) class Np2DLikeInvLowRank(Np2DLike): def __init__(self, K:Np2D, M:Np2D, solverType:str, solverArgs:DictAny, rank:int, oversampling : int = 10, seed : int = 420): sizeO = K.shape[1] if hasattr(K, "shape") else M.shape[1] if rank > sizeO: raise RROMPyException(("Cannot select compressed rank larger than " "original size.")) if oversampling < 0: raise RROMPyException("Oversampling parameter must be positive.") HF = Np2DLikeInv(K, M, solverType, solverArgs) np.random.seed(seed) xs = np.random.randn(sizeO, rank + oversampling) samples = HF.dot(xs) Q, _ = np.linalg.qr(samples, mode = "reduced") R = HF.dot(Q).T.conj() # assuming HF (i.e. K) hermitian... - U, s, Vh = np.linalg.svd(R) + U, s, Vh = np.linalg.svd(R, full_matrices = False) self.L = Q.dot(U[:, : rank]) * s[: rank] self.R = Vh[: rank, :] def dot(self, u:Np2D) -> Np2D: - return self.L.dot(self.R.dot(u)).reshape(u.shape) + return tdot(self.L, tdot(self.R, u)).reshape(u.shape) @property def shape(self): return (self.L.shape[0], self.R.shape[1]) class normEngine: def __init__(self, energyNormMatrix:Np2D): self.energyNormMatrix = copy(energyNormMatrix) def innerProduct(self, u:Np2D, v:Np2D, onlyDiag : bool = False) -> Np2D: if not isinstance(u, (np.ndarray,)): u = u.data if not isinstance(v, (np.ndarray,)): v = v.data if onlyDiag: - return np.sum(self.energyNormMatrix.dot(u) * v.conj(), axis = 0) - return v.T.conj().dot(self.energyNormMatrix.dot(u)) + return np.sum(tdot(self.energyNormMatrix, u) * v.conj(), axis = 0) + return tdot(tdot(self.energyNormMatrix, u).T, v.conj()).T def norm(self, u:Np2D) -> Np1D: return np.power(np.abs(self.innerProduct(u, u, onlyDiag = True)), .5) diff --git a/rrompy/utilities/expression/monomial_creator.py b/rrompy/utilities/expression/monomial_creator.py index 33c285e..dcbffe1 100644 --- a/rrompy/utilities/expression/monomial_creator.py +++ b/rrompy/utilities/expression/monomial_creator.py @@ -1,57 +1,57 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import numpy as np from rrompy.utilities.numerical import (multibinom, nextDerivativeIndices, hashIdxToDerivative as hashI, hashDerivativeToIdx as hashD) from rrompy.utilities.base.types import List, TupleAny __all__ = ["createMonomial", "createMonomialList"] def createMonomial(deg:List[int], return_derivatives : bool = False) -> List[List[TupleAny]]: if not hasattr(deg, "__len__"): deg = [deg] dim = len(deg) degj = hashD(deg) expr = [] for k in range(degj * return_derivatives + 1): degder = hashI(k, dim) derdiff = [x - y for (x, y) in zip(deg, degder)] if all([d >= 0 for d in derdiff]): mult = multibinom(deg, degder) if np.sum(derdiff) == 0: exprLoc = (mult,) else: exprLoc = ("prod", {"axis" : 1}, ("data", "x", "**", derdiff)) if not np.isclose(mult, 1): - exprLoc = (mult, "*",) + exprLoc + exprLoc = exprLoc + ("*", mult,) expr += [exprLoc] else: expr += [(0.,)] if return_derivatives: expr += [None] return expr def createMonomialList(n:int, dim:int, return_derivatives : bool = False) -> List[List[TupleAny]]: derIdxs = nextDerivativeIndices([], dim, n) idxList = [] for j, der in enumerate(derIdxs): idxList += [createMonomial(der, return_derivatives)] return idxList diff --git a/rrompy/utilities/poly_fitting/heaviside/heaviside_to_from_affine.py b/rrompy/utilities/poly_fitting/heaviside/heaviside_to_from_affine.py index 2057f10..931b7d7 100644 --- a/rrompy/utilities/poly_fitting/heaviside/heaviside_to_from_affine.py +++ b/rrompy/utilities/poly_fitting/heaviside/heaviside_to_from_affine.py @@ -1,94 +1,95 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import numpy as np from scipy.special import binom import scipy.sparse as sp from rrompy.utilities.base.types import (Np1D, Np2D, List, ListAny, Tuple, paramVal) -from rrompy.utilities.numerical import eigNonlinearDense +from rrompy.utilities.numerical import eigNonlinearDense, dot, solve from rrompy.utilities.exception_manager import RROMPyException from rrompy.parameter import checkParameter __all__ = ['heaviside2affine', 'affine2heaviside'] def heaviside2affine(c:Np2D, poles:Np1D, mu : paramVal = [], basis : str = "MONOMIAL_HEAVISIDE", sparse : bool = False) \ -> Tuple[Np2D, List[Np2D], List[Np1D]]: mu = checkParameter(mu, 1)(0, 0) n, d = len(poles), len(c) - len(poles) basisN = basis.split("_")[0] if basisN not in ["MONOMIAL", "CHEBYSHEV", "LEGENDRE"]: raise RROMPyException("Polynomial basis not recognized.") if sparse: A0 = sp.spdiags([np.concatenate((- mu - poles, np.ones(d)))], [0], n + d, n + d) A1 = sp.spdiags([np.concatenate((np.ones(n), np.zeros(d)))], [0], n + d, n + d) else: A0 = np.diag(np.concatenate((mu - poles, np.ones(d)))) A1 = np.diag(np.concatenate((np.ones(n), np.zeros(d)))) As = [A0, A1] bs = np.zeros((d, n + d), dtype = poles.dtype) bs[0, :] = 1. if d > 0: bs[0, n + 1 :] = 0. if d > 1: bs[1, n + 1] = 1. for j in range(2, d): if basisN == "MONOMIAL": bs[j, n + j] = 1. else: alpha = - 1. if basisN == "CHEBYSHEV" else 1. / j - 1. bs[:, n + j] = alpha * bs[:, n + j - 2] bs[1 :, n + j] += (1. - alpha) * bs[: -1, n + j - 1] bs = list(bs) return c.reshape(c.shape[0], -1).T, As, bs def affine2heaviside(As:ListAny, bs:ListAny, jSupp : int = 1) -> Tuple[Np2D, Np1D, str]: if jSupp != 1 and not (isinstance(jSupp, (int,)) and jSupp.upper() == "COMPANION"): raise RROMPyException(("Affine to heaviside conversion does not allow " "nonlinear eigenproblem support outside first " "block row.")) N = len(As) M = len(bs) n = As[0].shape[0] if N == 1: poles = np.empty(0, dtype = np.complex) Q = np.eye(n) else: basis = "MONOMIAL_HEAVISIDE" poles, P, Q = eigNonlinearDense(As, jSupp = jSupp, return_inverse = True) P = P[- n :, :] Q = Q[:, : n] - bEffs = np.array([Q.dot(np.linalg.solve(As[-1], b)) for b in bs]) + bEffs = np.array([dot(Q, solve(As[-1], b, np.linalg.solve, + {})) for b in bs]) if N == 1: c = bEffs else: c = np.zeros((len(poles) + M - 1, As[0].shape[1]), dtype = np.complex) for l, pl in enumerate(poles): for i in range(M): c[l, :] = pl ** i * bEffs[i, l] * P[:, l] for l in range(M - 1): for i in range(l + 1, M): - c[len(poles) + l, :] = P.dot(poles ** (i- 1 - l) * bEffs[i, :]) + c[len(poles) + l, :] = dot(P, poles ** (i- 1 - l) * bEffs[i, :]) return c, poles, basis diff --git a/rrompy/utilities/poly_fitting/moving_least_squares/moving_least_squares_interpolator.py b/rrompy/utilities/poly_fitting/moving_least_squares/moving_least_squares_interpolator.py index 5753d91..bab415d 100644 --- a/rrompy/utilities/poly_fitting/moving_least_squares/moving_least_squares_interpolator.py +++ b/rrompy/utilities/poly_fitting/moving_least_squares/moving_least_squares_interpolator.py @@ -1,145 +1,144 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import numpy as np from copy import deepcopy as copy from rrompy.utilities.base.types import (List, ListAny, DictAny, Np1D, Np2D, paramList) -from rrompy.utilities.numerical import customPInv +from rrompy.utilities.numerical import customPInv, dot from .vander import mlsweights from rrompy.utilities.poly_fitting.interpolator import GenericInterpolator from rrompy.utilities.poly_fitting.polynomial.vander import (polyvander as pv, polyvanderTotal as pvT) from rrompy.utilities.exception_manager import RROMPyException, RROMPyAssert from rrompy.parameter import checkParameterList __all__ = ['MovingLeastSquaresInterpolator'] class MovingLeastSquaresInterpolator(GenericInterpolator): """HERE""" def __init__(self, other = None): if other is None: return self.support = other.support self.localProjector = other.localProjector self.localVanders = other.localVanders self.suppValues = other.suppValues self.directionalWeights = other.directionalWeights self.degree = other.degree self.npar = other.npar self.radialbasis = other.radialbasis self.polybasis = other.polybasis self.evalParams = other.evalParams self.totalDegree = other.totalDegree @property def shape(self): sh = self.suppValues.shape[1 :] if self.suppValues.ndim > 1 else 1 return sh @property def deg(self): return self.degree def __call__(self, mu:paramList, der : List[int] = None, scl : Np1D = None): if der is not None and np.sum(der) > 0: raise RROMPyException(("Cannot take derivatives of moving least " "squares function.")) mu = checkParameterList(mu, self.npar)[0] sh = self.shape if sh == 1: sh = tuple([]) values = np.empty((len(mu),) + sh, dtype = np.complex) for i, m in enumerate(mu): weights = mlsweights(m, self.support, self.radialbasis, directionalWeights = self.directionalWeights, nNearestNeighbor = self.evalParams["nNearestNeighbor"]) weights /= np.linalg.norm(weights) vanderLS = np.sum(self.localVanders * weights, axis = 2) - RHSLS = np.tensordot(self.localProjector * weights, - self.suppValues, 1) + RHSLS = dot(self.localProjector * weights, self.suppValues) if self.totalDegree: vanderEval, _, _ = pvT(m, self.deg[0], self.polybasis, **self.evalParams) else: vanderEval = pv(m, self.deg, self.polybasis, **self.evalParams) vanderEval = vanderEval.flatten() - values[i] = vanderEval.dot(customPInv(vanderLS).dot(RHSLS)) + values[i] = dot(vanderEval, dot(customPInv(vanderLS), RHSLS)) return values def __copy__(self): return MovingLeastSquaresInterpolator(self) def __deepcopy__(self, memo): other = MovingLeastSquaresInterpolator() (other.support, other.localProjector, other.localVanders, other.suppValues, other.directionalWeights, other.degree, other.npar, other.radialbasis, other.polybasis, other.evalParams, other.totalDegree) = copy( (self.support, self.localProjector, self.localVanders, self.suppValues, self.directionalWeights, self.degree, self.npar, self.radialbasis, self.polybasis, self.evalParams, self.totalDegree), memo) return other def postmultiplyTensorize(self, A:Np2D): RROMPyAssert(A.shape[0], self.shape[-1], "Shape of output") - self.suppValues = np.tensordot(self.suppValues, A, axes = (-1, 0)) + self.suppValues = self.suppValues.dot(A) def pad(self, nleft : List[int] = None, nright : List[int] = None): if nleft is None: nleft = [0] * len(self.shape) if nright is None: nright = [0] * len(self.shape) if not hasattr(nleft, "__len__"): nleft = [nleft] if not hasattr(nright, "__len__"): nright = [nright] RROMPyAssert(len(self.shape), len(nleft), "Shape of output") RROMPyAssert(len(self.shape), len(nright), "Shape of output") padwidth = [(0, 0)] + [(l, r) for l, r in zip(nleft, nright)] self.suppValues = np.pad(self.suppValues, padwidth, "constant", constant_values = (0., 0.)) def setupByInterpolation(self, support:paramList, values:ListAny, deg:int, polybasis : str = "MONOMIAL_GAUSSIAN", directionalWeights : Np1D = None, totalDegree : bool = True, vanderCoeffs : DictAny = {}): support = checkParameterList(support)[0] self.support = copy(support) if "reorder" in vanderCoeffs.keys(): self.support = self.support[vanderCoeffs["reorder"]] if "nNearestNeighbor" not in vanderCoeffs.keys(): vanderCoeffs["nNearestNeighbor"] = -1 self.npar = support.shape[1] if directionalWeights is None: directionalWeights = np.ones(self.npar) self.directionalWeights = directionalWeights self.polybasis, self.radialbasis, _ = polybasis.split("_") self.totalDegree = totalDegree self.evalParams = vanderCoeffs if totalDegree: vander, _, _ = pvT(support, deg, self.polybasis, **vanderCoeffs) if not hasattr(deg, "__len__"): deg = [deg] * self.npar else: if not hasattr(deg, "__len__"): deg = [deg] * self.npar vander = pv(support, deg, self.polybasis, **vanderCoeffs) self.degree = deg self.localProjector = vander.T.conj() self.localVanders = np.array([np.outer(van, van.conj()) \ for van in vander]) self.localVanders = np.swapaxes(self.localVanders, 0, 2) self.suppValues = np.array(values) diff --git a/rrompy/utilities/poly_fitting/polynomial/polynomial_interpolator.py b/rrompy/utilities/poly_fitting/polynomial/polynomial_interpolator.py index 0d3c748..3b46e64 100644 --- a/rrompy/utilities/poly_fitting/polynomial/polynomial_interpolator.py +++ b/rrompy/utilities/poly_fitting/polynomial/polynomial_interpolator.py @@ -1,131 +1,131 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import numpy as np from copy import deepcopy as copy from rrompy.utilities.base.types import (List, ListAny, DictAny, Np1D, Np2D, paramList) from rrompy.utilities.base import freepar as fp from rrompy.utilities.poly_fitting.interpolator import GenericInterpolator from rrompy.utilities.poly_fitting.custom_fit import customFit from .base import polyfitname from .val import polyval from .roots import polyroots from .vander import polyvander as pv, polyvanderTotal as pvT -from rrompy.utilities.numerical import degreeTotalToFull +from rrompy.utilities.numerical import degreeTotalToFull, dot from rrompy.utilities.exception_manager import RROMPyAssert, RROMPyException from rrompy.parameter import checkParameterList __all__ = ['PolynomialInterpolator'] class PolynomialInterpolator(GenericInterpolator): """HERE""" def __init__(self, other = None): if other is None: return self.coeffs = other.coeffs self.npar = other.npar self.polybasis = other.polybasis @property def shape(self): if self.coeffs.ndim > self.npar: sh = self.coeffs.shape[self.npar :] else: sh = tuple([1]) return sh @property def deg(self): return [x - 1 for x in self.coeffs.shape[: self.npar]] def __getitem__(self, key): return self.coeffs[key] def __call__(self, mu:paramList, der : List[int] = None, scl : Np1D = None): return polyval(mu, self.coeffs, self.polybasis, der, scl) def __copy__(self): return PolynomialInterpolator(self) def __deepcopy__(self, memo): other = PolynomialInterpolator() other.coeffs, other.npar, other.polybasis = copy( (self.coeffs, self.npar, self.polybasis), memo) return other def pad(self, nleft : List[int] = None, nright : List[int] = None): if nleft is None: nleft = [0] * len(self.shape) if nright is None: nright = [0] * len(self.shape) if not hasattr(nleft, "__len__"): nleft = [nleft] if not hasattr(nright, "__len__"): nright = [nright] RROMPyAssert(len(self.shape), len(nleft), "Shape of output") RROMPyAssert(len(self.shape), len(nright), "Shape of output") padwidth = [(0, 0)] * self.npar padwidth = padwidth + [(l, r) for l, r in zip(nleft, nright)] self.coeffs = np.pad(self.coeffs, padwidth, "constant", constant_values = (0., 0.)) def postmultiplyTensorize(self, A:Np2D): RROMPyAssert(A.shape[0], self.shape[-1], "Shape of output") - self.coeffs = np.tensordot(self.coeffs, A, axes = (-1, 0)) + self.coeffs = dot(self.coeffs, A) def setupByInterpolation(self, support:paramList, values:ListAny, deg:int, polybasis : str = "MONOMIAL", verbose : bool = True, totalDegree : bool = True, vanderCoeffs : DictAny = {}, fitCoeffs : DictAny = {}): support = checkParameterList(support)[0] self.npar = support.shape[1] self.polybasis = polybasis if totalDegree: vander, _, reorder = pvT(support, deg, basis = polybasis, **vanderCoeffs) vander = vander[:, reorder] else: if not hasattr(deg, "__len__"): deg = [deg] * self.npar vander = pv(support, deg, basis = polybasis, **vanderCoeffs) outDim = values.shape[1:] values = values.reshape(values.shape[0], -1) fitOut = customFit(vander, values, full = True, **fitCoeffs) P = fitOut[0] if verbose: msg = ("Fitting {} samples with degree {} through {}... " "Conditioning of LS system: {:.4e}.").format( len(vander), deg, polyfitname(self.polybasis), fitOut[1][2][0] / fitOut[1][2][-1]) else: msg = None if totalDegree: self.coeffs = degreeTotalToFull(tuple([deg + 1] * self.npar) + outDim, self.npar, P) else: self.coeffs = P.reshape(tuple([d + 1 for d in deg]) + outDim) return fitOut[1][1] == vander.shape[1], msg def roots(self, marginalVals : ListAny = [fp]): RROMPyAssert(self.shape, (1,), "Shape of output") RROMPyAssert(len(marginalVals), self.npar, "Number of parameters") try: rDim = marginalVals.index(fp) if rDim < len(marginalVals) - 1 and fp in marginalVals[rDim + 1 :]: raise except: raise RROMPyException(("Exactly 1 'freepar' entry in " "marginalVals must be provided.")) return polyroots(self.coeffs, self.polybasis, marginalVals) diff --git a/rrompy/utilities/poly_fitting/radial_basis/radial_basis_interpolator.py b/rrompy/utilities/poly_fitting/radial_basis/radial_basis_interpolator.py index 1a1b74d..e1e0023 100644 --- a/rrompy/utilities/poly_fitting/radial_basis/radial_basis_interpolator.py +++ b/rrompy/utilities/poly_fitting/radial_basis/radial_basis_interpolator.py @@ -1,147 +1,147 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import numpy as np from copy import deepcopy as copy from rrompy.utilities.base.types import (List, ListAny, DictAny, Np1D, Np2D, paramList) from rrompy.utilities.poly_fitting.interpolator import GenericInterpolator from rrompy.utilities.poly_fitting.custom_fit import customFit from .base import polyfitname from .val import polyval from .vander import polyvander as pv, polyvanderTotal as pvT -from rrompy.utilities.numerical import degreeTotalToFull +from rrompy.utilities.numerical import degreeTotalToFull, dot from rrompy.utilities.exception_manager import RROMPyException, RROMPyAssert from rrompy.parameter import checkParameterList __all__ = ['RadialBasisInterpolator'] class RadialBasisInterpolator(GenericInterpolator): """HERE""" def __init__(self, other = None): if other is None: return self.support = other.support self.coeffsGlobal = other.coeffsGlobal self.coeffsLocal = other.coeffsLocal self.directionalWeights = other.directionalWeights self.npar = other.npar self.polybasis = other.polybasis self.nNearestNeighbor = other.nNearestNeighbor @property def shape(self): sh = self.coeffsLocal.shape[1 :] if self.coeffsLocal.ndim > 1 else 1 return sh @property def deg(self): return [x - 1 for x in self.coeffsGlobal.shape[: self.npar]] def __call__(self, mu:paramList, der : List[int] = None, scl : Np1D = None): if der is not None and np.sum(der) > 0: raise RROMPyException(("Cannot take derivatives of radial basis " "function.")) return polyval(mu, self.coeffsGlobal, self.coeffsLocal, self.support, self.directionalWeights, self.polybasis, self.nNearestNeighbor) def __copy__(self): return RadialBasisInterpolator(self) def __deepcopy__(self, memo): other = RadialBasisInterpolator() (other.support, other.coeffsGlobal, other.coeffsLocal, other.directionalWeights, other.npar, other.polybasis, other.nNearestNeighbor) = copy( (self.support, self.coeffsGlobal, self.coeffsLocal, self.directionalWeights, self.npar, self.polybasis, self.nNearestNeighbor), memo) return other def postmultiplyTensorize(self, A:Np2D): RROMPyAssert(A.shape[0], self.shape[-1], "Shape of output") - self.coeffsLocal = np.tensordot(self.coeffsLocal, A, axes = (-1, 0)) - self.coeffsGlobal = np.tensordot(self.coeffsGlobal, A, axes = (-1, 0)) + self.coeffsLocal = dot(self.coeffsLocal, A) + self.coeffsGlobal = dot(self.coeffsGlobal, A) def pad(self, nleft : List[int] = None, nright : List[int] = None): if nleft is None: nleft = [0] * len(self.shape) if nright is None: nright = [0] * len(self.shape) if not hasattr(nleft, "__len__"): nleft = [nleft] if not hasattr(nright, "__len__"): nright = [nright] RROMPyAssert(len(self.shape), len(nleft), "Shape of output") RROMPyAssert(len(self.shape), len(nright), "Shape of output") padwidth = [(0, 0)] + [(l, r) for l, r in zip(nleft, nright)] self.coeffsLocal = np.pad(self.coeffsLocal, padwidth, "constant", constant_values = (0., 0.)) padwidth = [(0, 0)] * (self.npar - 1) + padwidth self.coeffsGlobal = np.pad(self.coeffsGlobal, padwidth, "constant", constant_values = (0., 0.)) def setupByInterpolation(self, support:paramList, values:ListAny, deg:int, polybasis : str = "MONOMIAL_GAUSSIAN", directionalWeights : Np1D = None, verbose : bool = True, totalDegree : bool = True, vanderCoeffs : DictAny = {}, fitCoeffs : DictAny = {}): support = checkParameterList(support)[0] self.support = copy(support) if "reorder" in vanderCoeffs.keys(): self.support = self.support[vanderCoeffs["reorder"]] if "nNearestNeighbor" in vanderCoeffs.keys(): self.nNearestNeighbor = vanderCoeffs["nNearestNeighbor"] else: self.nNearestNeighbor = -1 self.npar = support.shape[1] if directionalWeights is None: directionalWeights = np.ones(self.npar) self.directionalWeights = directionalWeights self.polybasis = polybasis if totalDegree: vander, _, reorder = pvT(support, deg, basis = polybasis, directionalWeights = self.directionalWeights, **vanderCoeffs) vander = vander[reorder] vander = vander[:, reorder] else: if not hasattr(deg, "__len__"): deg = [deg] * self.npar vander = pv(support, deg, basis = polybasis, directionalWeights = self.directionalWeights, **vanderCoeffs) outDim = values.shape[1:] values = values.reshape(values.shape[0], -1) values = np.pad(values, ((0, len(vander) - len(values)), (0, 0)), "constant") fitOut = customFit(vander, values, full = True, **fitCoeffs) P = fitOut[0][len(support) :] if verbose: msg = ("Fitting {}+{} samples with degree {} through {}... " "Conditioning of LS system: {:.4e}.").format( len(support), len(vander) - len(support), deg, polyfitname(self.polybasis), fitOut[1][2][0] / fitOut[1][2][-1]) else: msg = None self.coeffsLocal = fitOut[0][: len(support)] if totalDegree: self.coeffsGlobal = degreeTotalToFull(tuple([deg + 1] * self.npar) + outDim, self.npar, P) else: self.coeffsGlobal = P.reshape(tuple([d + 1 for d in deg]) + outDim) return fitOut[1][1] == vander.shape[1], msg