diff --git a/rrompy/hfengines/base/hfengine_base.py b/rrompy/hfengines/base/hfengine_base.py index 7f128ea..d90d31d 100644 --- a/rrompy/hfengines/base/hfengine_base.py +++ b/rrompy/hfengines/base/hfengine_base.py @@ -1,316 +1,317 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from abc import abstractmethod import numpy as np import scipy.sparse as scsp from numbers import Number +from collections.abc import Iterable from copy import copy as softcopy from rrompy.utilities.base.decorators import nonaffine_construct from rrompy.utilities.base.types import (Np1D, Np2D, List, DictAny, paramVal, paramList, sampList) from rrompy.utilities.numerical import solve as tsolve, dot, pseudoInverse from rrompy.utilities.expression import expressionEvaluator from rrompy.utilities.exception_manager import RROMPyAssert from rrompy.sampling.sample_list import sampleList from rrompy.parameter import (checkParameter, checkParameterList, parameterList, parameterMap as pMap) from rrompy.solver.linear_solver import setupSolver from rrompy.utilities.parallel import (poolRank, masterCore, listScatter, matrixGatherv, isend, recv) __all__ = ['HFEngineBase'] class HFEngineBase: """Generic solver for parametric problems.""" def __init__(self, verbosity : int = 10, timestamp : bool = True): self.verbosity = verbosity self.timestamp = timestamp self.setSolver("SPSOLVE", {"use_umfpack" : False}) self.npar = 0 self._C = None self.outputNormMatrix = 1. def name(self) -> str: return self.__class__.__name__ def __str__(self) -> str: return self.name() def __repr__(self) -> str: return self.__str__() + " at " + hex(id(self)) def __dir_base__(self): return [x for x in self.__dir__() if x[:2] != "__"] def __deepcopy__(self, memo): return softcopy(self) @property def npar(self): """Value of npar.""" return self._npar @npar.setter def npar(self, npar): nparOld = self._npar if hasattr(self, "_npar") else -1 if npar != nparOld: self.parameterMap = pMap(1., npar) self._npar = npar @property def spacedim(self): return 1 def checkParameter(self, mu:paramVal) -> paramVal: muP = checkParameter(mu, self.npar) if self.npar == 0: muP.reset((1, 0), muP.dtype) return muP def checkParameterList(self, mu:paramList, check_if_single : bool = False) -> paramList: muL = checkParameterList(mu, self.npar, check_if_single) return muL def mapParameterList(self, mu:paramList, direct : str = "F", idx : List[int] = None) -> paramList: if idx is None: idx = np.arange(self.npar) muMapped = checkParameterList(mu, len(idx)) for j, d in enumerate(idx): muMapped.data[:, j] = expressionEvaluator( self.parameterMap[direct][d], muMapped(j)).flatten() return muMapped def buildEnergyNormForm(self): """ Build sparse matrix (in CSR format) representative of scalar product. """ self.energyNormMatrix = 1. def buildEnergyNormDualForm(self): """ Build sparse matrix (in CSR format) representative of dual scalar product without duality. """ self.energyNormDualMatrix = 1. def innerProduct(self, u:Np2D, v:Np2D, onlyDiag : bool = False, dual : bool = False, is_state : bool = True) -> Np2D: """Scalar product.""" if is_state or self.isCEye: if dual: if not hasattr(self, "energyNormDualMatrix"): self.buildEnergyNormDualForm() energyMat = self.energyNormDualMatrix else: if not hasattr(self, "energyNormMatrix"): self.buildEnergyNormForm() energyMat = self.energyNormMatrix else: energyMat = self.outputNormMatrix if isinstance(u, (parameterList, sampleList)): u = u.data if isinstance(v, (parameterList, sampleList)): v = v.data if onlyDiag: return np.sum(dot(energyMat, u) * v.conj(), axis = 0) return dot(dot(energyMat, u).T, v.conj()).T def norm(self, u:Np2D, dual : bool = False, is_state : bool = True) -> Np1D: return np.abs(self.innerProduct(u, u, onlyDiag = True, dual = dual, is_state = is_state)) ** .5 def baselineA(self): """Return 0 of shape consistent with operator of linear system.""" - if (hasattr(self, "As") and hasattr(self.As, "__len__") + if (hasattr(self, "As") and isinstance(self.As, Iterable) and self.As[0] is not None): d = self.As[0].shape[0] else: d = self.spacedim return scsp.csr_matrix((np.zeros(0), np.zeros(0), np.zeros(d + 1)), shape = (d, d), dtype = np.complex) def baselineb(self): """Return 0 of shape consistent with RHS of linear system.""" return np.zeros(self.spacedim, dtype = np.complex) @nonaffine_construct @abstractmethod def A(self, mu : paramVal = [], der : List[int] = 0) -> Np2D: """ Assemble terms of operator of linear system and return it (or its derivative) at a given parameter. """ return @nonaffine_construct @abstractmethod def b(self, mu : paramVal = [], der : List[int] = 0) -> Np1D: """ Assemble terms of RHS of linear system and return it (or its derivative) at a given parameter. """ return @property def C(self): """Value of C.""" if self._C is None: self._C = 1. return self._C @property def isCEye(self): return isinstance(self.C, Number) def applyC(self, u:sampList): """Apply LHS of linear system.""" return dot(self.C, u) def applyCpInv(self, u:sampList): """Apply pseudoinverse of LHS of linear system.""" return dot(pseudoInverse(self.C), u) def setSolver(self, solverType:str, solverArgs : DictAny = {}): """Choose solver type and parameters.""" self._solver, self._solverArgs = setupSolver(solverType, solverArgs) def solve(self, mu : paramList = [], RHS : sampList = None, return_state : bool = False) -> sampList: """ Find solution of linear system. Args: mu: parameter value. RHS: RHS of linear system. If None, defaults to that of parametric system. Defaults to None. return_state: whether to return state before multiplication by c. Defaults to False. """ from rrompy.sampling import sampleList, emptySampleList if mu == []: mu = self.mu0 mu = self.checkParameterList(mu) if len(mu) == 0: return emptySampleList() mu, idx, sizes = listScatter(mu, return_sizes = True) mu = self.checkParameterList(mu) req, emptyCores = [], np.where(np.logical_not(sizes))[0] if len(mu) == 0: uL, uT = recv(source = 0, tag = poolRank()) sol = np.empty((uL, 0), dtype = uT) else: if RHS is None: RHS = sampleList([self.b(m) for m in mu]) else: RHS = sampleList(RHS) if len(RHS) > 1: RHS = sampleList([RHS[i] for i in idx]) mult = 0 if len(RHS) == 1 else 1 RROMPyAssert(mult * (len(mu) - 1) + 1, len(RHS), "Sample size") for j, mj in enumerate(mu): u = tsolve(self.A(mj), RHS[mult * j], self._solver, self._solverArgs) if j == 0: sol = np.empty((len(u), len(mu)), dtype = u.dtype) if masterCore(): for dest in emptyCores: req += [isend((len(u), u.dtype), dest = dest, tag = dest)] sol[:, j] = u if not return_state: sol = self.applyC(sol) for r in req: r.wait() return sampleList(matrixGatherv(sol, sizes)) def residual(self, mu : paramList = [], u : sampList = None, post_c : bool = True) -> sampList: """ Find residual of linear system for given approximate solution. Args: mu: parameter value. u: numpy complex array with function dofs. If None, set to 0. post_c: whether to post-process using c. Defaults to True. """ from rrompy.sampling import sampleList, emptySampleList if mu == []: mu = self.mu0 mu = self.checkParameterList(mu) if len(mu) == 0: return emptySampleList() mu, idx, sizes = listScatter(mu, return_sizes = True) mu = self.checkParameterList(mu) req, emptyCores = [], np.where(np.logical_not(sizes))[0] if len(mu) == 0: uL, uT = recv(source = 0, tag = poolRank()) res = np.empty((uL, 0), dtype = uT) else: v = sampleList(np.zeros((self.spacedim, len(mu)))) if u is not None: u = sampleList(u) v = v + sampleList([u[i] for i in idx]) for j, (mj, vj) in enumerate(zip(mu, v)): r = self.b(mj) - dot(self.A(mj), vj) if j == 0: res = np.empty((len(r), len(mu)), dtype = r.dtype) if masterCore(): for dest in emptyCores: req += [isend((len(r), r.dtype), dest = dest, tag = dest)] res[:, j] = r if post_c: res = self.applyC(res) for r in req: r.wait() return sampleList(matrixGatherv(res, sizes)) cutOffPolesRMax,cutOffPolesRMin = np.inf, - np.inf cutOffPolesRMaxRel, cutOffPolesRMinRel = np.inf, - np.inf cutOffPolesIMax, cutOffPolesIMin = np.inf, - np.inf cutOffPolesIMaxRel, cutOffPolesIMinRel = np.inf, - np.inf cutOffResNormMin = -1 def flagBadPolesResidues(self, poles:Np1D, residues : Np1D = None, relative : bool = False) -> Np1D: """ Flag (numerical) poles/residues which are impossible. Args: poles: poles to be judged. residues: residues to be judged. relative: whether relative values should be used for poles. """ poles = np.array(poles).flatten() flag = np.zeros(len(poles), dtype = bool) if residues is None: self._ignoreResidues = self.cutOffResNormMin <= 0. if relative: RMax, RMin = self.cutOffPolesRMaxRel, self.cutOffPolesRMinRel IMax, IMin = self.cutOffPolesIMaxRel, self.cutOffPolesIMinRel else: RMax, RMin = self.cutOffPolesRMax, self.cutOffPolesRMin IMax, IMin = self.cutOffPolesIMax, self.cutOffPolesIMin if not np.isinf(RMax): flag = np.logical_or(flag, np.real(poles) > RMax) if not np.isinf(RMin): flag = np.logical_or(flag, np.real(poles) < RMin) if not np.isinf(IMax): flag = np.logical_or(flag, np.imag(poles) > IMax) if not np.isinf(IMin): flag = np.logical_or(flag, np.imag(poles) < IMin) else: residues = np.array(residues).reshape(len(poles), -1) if self.cutOffResNormMin > 0.: if residues.shape[1] == self.spacedim: resEff = self.norm(residues.T) else: resEff = np.linalg.norm(residues, axis = 1) resEff /= np.max(resEff) flag = np.logical_or(flag, resEff < self.cutOffResNormMin) return flag diff --git a/rrompy/hfengines/base/linear_affine_engine.py b/rrompy/hfengines/base/linear_affine_engine.py index 3bacc52..6cb6d17 100644 --- a/rrompy/hfengines/base/linear_affine_engine.py +++ b/rrompy/hfengines/base/linear_affine_engine.py @@ -1,197 +1,198 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from abc import abstractmethod import numpy as np import scipy.sparse as scsp +from collections.abc import Iterable from copy import deepcopy as copy from .hfengine_base import HFEngineBase from rrompy.utilities.base.decorators import affine_construct from rrompy.utilities.base.types import (Np1D, Np2D, List, ListAny, TupleAny, paramVal) from rrompy.utilities.expression import (expressionEvaluator, createMonomial, createMonomialList) from rrompy.utilities.numerical.hash_derivative import ( hashDerivativeToIdx as hashD) from rrompy.utilities.exception_manager import RROMPyException __all__ = ['LinearAffineEngine', 'checkIfAffine'] class LinearAffineEngine(HFEngineBase): """Generic solver for affine parametric problems.""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._affinePoly = True self.nAs, self.nbs = 1, 1 @property def affinePoly(self): return self._affinePoly @property def nAs(self): """Value of nAs.""" return self._nAs @nAs.setter def nAs(self, nAs): nAsOld = self._nAs if hasattr(self, "_nAs") else -1 if nAs != nAsOld: self._nAs = nAs self.resetAs() @property def nbs(self): """Value of nbs.""" return self._nbs @nbs.setter def nbs(self, nbs): nbsOld = self._nbs if hasattr(self, "_nbs") else -1 if nbs != nbsOld: self._nbs = nbs self.resetbs() @property def spacedim(self): - if (hasattr(self, "bs") and hasattr(self.bs, "__len__") + if (hasattr(self, "bs") and isinstance(self.bs, Iterable) and self.bs[0] is not None): return len(self.bs[0]) return super().spacedim def getMonomialSingleWeight(self, deg:List[int]): return createMonomial(deg, True) def getMonomialWeights(self, n:int): return createMonomialList(n, self.npar, True) def setAs(self, As:List[Np2D]): """Assign terms of operator of linear system.""" if len(As) != self.nAs: raise RROMPyException(("Expected number {} of terms of As not " "matching given list length {}.").format(self.nAs, len(As))) self.As = [copy(A) for A in As] def setthAs(self, thAs:List[List[TupleAny]]): """Assign terms of operator of linear system.""" if len(thAs) != self.nAs: raise RROMPyException(("Expected number {} of terms of thAs not " "matching given list length {}.").format(self.nAs, len(thAs))) self.thAs = copy(thAs) def setbs(self, bs:List[Np1D]): """Assign terms of RHS of linear system.""" if len(bs) != self.nbs: raise RROMPyException(("Expected number {} of terms of bs not " "matching given list length {}.").format(self.nbs, len(bs))) self.bs = [copy(b) for b in bs] def setthbs(self, thbs:List[List[TupleAny]]): """Assign terms of RHS of linear system.""" if len(thbs) != self.nbs: raise RROMPyException(("Expected number {} of terms of thbs not " "matching given list length {}.").format(self.nbs, len(thbs))) self.thbs = copy(thbs) def resetAs(self): """Reset (derivatives of) operator of linear system.""" if hasattr(self, "_nAs"): self.setAs([None] * self.nAs) self.setthAs([None] * self.nAs) def resetbs(self): """Reset (derivatives of) RHS of linear system.""" if hasattr(self, "_nbs"): self.setbs([None] * self.nbs) self.setthbs([None] * self.nbs) def _assembleObject(self, mu:paramVal, objs:ListAny, th:ListAny, derI:int) -> Np2D: """Assemble (derivative of) object from list of derivatives.""" muE = self.mapParameterList(mu) obj = None for j in range(len(objs)): if len(th[j]) <= derI and th[j][-1] is not None: raise RROMPyException(("Cannot assemble operator. Non enough " "derivatives of theta provided.")) if len(th[j]) > derI and th[j][derI] is not None: expr = expressionEvaluator(th[j][derI], muE) - if hasattr(expr, "__len__"): + if isinstance(expr, Iterable): if len(expr) > 1: raise RROMPyException(("Size mismatch in value of " "theta function. Only scalars " "allowed.")) expr = expr[0] if obj is None: obj = expr * objs[j] else: obj = obj + expr * objs[j] return obj @abstractmethod def buildA(self): """Build terms of operator of linear system.""" if self.thAs[0] is None: self.thAs = self.getMonomialWeights(self.nAs) if self.As[0] is None: self.As[0] = scsp.eye(self.spacedim, dtype = np.complex, format = "csr") for j in range(1, self.nAs): if self.As[j] is None: self.As[j] = self.baselineA() @affine_construct def A(self, mu : paramVal = [], der : List[int] = 0) -> Np2D: """ Assemble terms of operator of linear system and return it (or its derivative) at a given parameter. """ - derI = hashD(der) if hasattr(der, "__len__") else der + derI = hashD(der) if isinstance(der, Iterable) else der if derI < 0 or derI > self.nAs - 1: return self.baselineA() self.buildA() assembledA = self._assembleObject(mu, self.As, self.thAs, derI) if assembledA is None: return self.baselineA() return assembledA @abstractmethod def buildb(self): """Build terms of RHS of linear system.""" if self.thbs[0] is None: self.thbs = self.getMonomialWeights(self.nbs) for j in range(self.nbs): if self.bs[j] is None: self.bs[j] = self.baselineb() @affine_construct def b(self, mu : paramVal = [], der : List[int] = 0) -> Np1D: """ Assemble terms of RHS of linear system and return it (or its derivative) at a given parameter. """ - derI = hashD(der) if hasattr(der, "__len__") else der + derI = hashD(der) if isinstance(der, Iterable) else der if derI < 0 or derI > self.nbs - 1: return self.baselineb() self.buildb() assembledb = self._assembleObject(mu, self.bs, self.thbs, derI) if assembledb is None: return self.baselineb() return assembledb def checkIfAffine(engine, msg : str = "apply method", noA : bool = False): msg = ("Cannot {} because of non-affine parametric dependence{}. Consider " "using DEIM to define a new engine.").format(msg, " of RHS" * noA) if (not (hasattr(engine.b, "is_affine") and engine.b.is_affine) or not (noA or (hasattr(engine.A, "is_affine") and engine.A.is_affine))): raise RROMPyException(msg) diff --git a/rrompy/hfengines/fenics_engines/helmholtz_problem_engine_augmented.py b/rrompy/hfengines/fenics_engines/helmholtz_problem_engine_augmented.py index b682999..222c0e2 100755 --- a/rrompy/hfengines/fenics_engines/helmholtz_problem_engine_augmented.py +++ b/rrompy/hfengines/fenics_engines/helmholtz_problem_engine_augmented.py @@ -1,267 +1,268 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from numpy import pad from scipy.sparse import eye, bmat, block_diag +from collections.abc import Iterable from .helmholtz_problem_engine import (HelmholtzProblemEngine, ScatteringProblemEngine) from rrompy.solver.fenics import (augmentedH1NormMatrix, augmentedHminus1NormMatrix) from rrompy.utilities.base import verbosityManager as vbMng from rrompy.utilities.exception_manager import RROMPyException from rrompy.parameter import parameterMap as pMap __all__ = ['HelmholtzProblemEngineAugmented', 'ScatteringProblemEngineAugmented'] class HelmholtzProblemEngineAugmented(HelmholtzProblemEngine): """ Solver for generic Helmholtz problems with parametric wavenumber. - \nabla \cdot (a \nabla u) - omega * n**2 * v = f in \Omega omega * u = v in \overline{\Omega} u = u0 on \Gamma_D \partial_nu = g1 on \Gamma_N \partial_nu + h u = g2 on \Gamma_R Attributes: verbosity: Verbosity level. BCManager: Boundary condition manager. V: Real FE space. u: Generic trial functions for variational form evaluation. v: Generic test functions for variational form evaluation. As: Scipy sparse array representation (in CSC format) of As. bs: Numpy array representation of bs. cs: Numpy array representation of cs. energyNormMatrix: Scipy sparse matrix representing inner product over V. energyNormDualMatrix: Scipy sparse matrix representing dual inner product between Riesz representers V-V. degree_threshold: Threshold for ufl expression interpolation degree. omega: Value of omega. diffusivity: Value of a. refractionIndex: Value of n. forcingTerm: Value of f. DirichletDatum: Value of u0. NeumannDatum: Value of g1. RobinDatumG: Value of g2. RobinDatumH: Value of h. DirichletBoundary: Function handle to \Gamma_D. NeumannBoundary: Function handle to \Gamma_N. RobinBoundary: Function handle to \Gamma_R. ds: Boundary measure 2-tuple (resp. for Neumann and Robin boundaries). dsToBeSet: Whether ds needs to be set. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.parameterMap = pMap(1., self.npar) @property def spacedim(self): - if (hasattr(self, "bs") and hasattr(self.bs, "__len__") + if (hasattr(self, "bs") and isinstance(self.bs, Iterable) and self.bs[0] is not None): return len(self.bs[0]) return 2 * super().spacedim def buildEnergyNormForm(self): """ Build sparse matrix (in CSR format) representative of scalar product. """ vbMng(self, "INIT", "Assembling energy matrix.", 20) self.energyNormMatrix = augmentedH1NormMatrix(self.V) vbMng(self, "DEL", "Done assembling energy matrix.", 20) def buildEnergyNormDualForm(self): """ Build sparse matrix (in CSR format) representative of dual scalar product without duality. """ vbMng(self, "INIT", "Assembling energy dual matrix.", 20) self.energyNormDualMatrix = augmentedHminus1NormMatrix(self.V, compressRank = self._energyDualNormCompress) vbMng(self, "DEL", "Done assembling energy dual matrix.", 20) def buildA(self): """Build terms of operator of linear system.""" ANone = any([A is None for A in self.As]) if not ANone: return self.nAs = 2 super().buildA() I = eye(self.spacedim // 2) self.As[0] = block_diag((self.As[0], I), format = "csr") self.As[1] = bmat([[None, self.As[1]], [- I, None]], format = "csr") def buildb(self): """Build terms of operator of linear system.""" bNone = any([b is None for b in self.bs]) if not bNone: return self.nbs = 1 dim = self.spacedim // 2 super().buildb() self.bs[0] = pad(self.bs[0], (0, dim), "constant") def plot(self, u, warping = None, is_state = False, name = "u", save = None, what = 'all', forceNewFile = True, saveFormat = "eps", saveDPI = 100, show = True, colorMap = "jet", fenplotArgs = {}, **figspecs): uh = u[: self.spacedim // 2] if is_state or self.isCEye else u return super().plot(uh, warping, is_state, name, save, what, forceNewFile, saveFormat, saveDPI, show, colorMap, fenplotArgs, **figspecs) def outParaview(self, u, warping = None, is_state = False, name = "u", filename = "out", time = 0., what = 'all', forceNewFile = True, folder = False, filePW = None): if not is_state and not self.isCEye: raise RROMPyException(("Cannot output to Paraview non-state " "object.")) return super().outParaview(u[: self.spacedim // 2], warping, is_state, name, filename, time, what, forceNewFile, folder, filePW) def outParaviewTimeDomain(self, u, omega, warping = None, is_state = False, timeFinal = None, periodResolution = 20, name = "u", filename = "out", forceNewFile = True, folder = False): if not is_state and not self.isCEye: raise RROMPyException(("Cannot output to Paraview non-state " "object.")) return super().outParaviewTimeDomain(u[: self.spacedim // 2], omega, warping, is_state, timeFinal, periodResolution, name, filename, forceNewFile, folder) class ScatteringProblemEngineAugmented(ScatteringProblemEngine): """ Solver for scattering problems with parametric wavenumber. - \nabla \cdot (a \nabla u) - omega * n**2 * v = f in \Omega omega * u = v in \overline{\Omega} u = u0 on \Gamma_D \partial_nu = g1 on \Gamma_N \partial_nu +- i v = g2 on \Gamma_R Attributes: verbosity: Verbosity level. BCManager: Boundary condition manager. V: Real FE space. u: Generic trial functions for variational form evaluation. v: Generic test functions for variational form evaluation. As: Scipy sparse array representation (in CSC format) of As. bs: Numpy array representation of bs. cs: Numpy array representation of cs. energyNormMatrix: Scipy sparse matrix representing inner product over V. energyNormDualMatrix: Scipy sparse matrix representing dual inner product between Riesz representers V-V. degree_threshold: Threshold for ufl expression interpolation degree. signR: Sign in ABC. omega: Value of omega. diffusivity: Value of a. forcingTerm: Value of f. DirichletDatum: Value of u0. NeumannDatum: Value of g1. RobinDatumG: Value of g2. DirichletBoundary: Function handle to \Gamma_D. NeumannBoundary: Function handle to \Gamma_N. RobinBoundary: Function handle to \Gamma_R. ds: Boundary measure 2-tuple (resp. for Neumann and Robin boundaries). dsToBeSet: Whether ds needs to be set. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.nAs = 2 self._weight0 = 1. @property def spacedim(self): - if (hasattr(self, "bs") and hasattr(self.bs, "__len__") + if (hasattr(self, "bs") and isinstance(self.bs, Iterable) and self.bs[0] is not None): return len(self.bs[0]) return 2 * super().spacedim def buildEnergyNormForm(self): """ Build sparse matrix (in CSR format) representative of scalar product. """ vbMng(self, "INIT", "Assembling energy matrix.", 20) self.energyNormMatrix = augmentedH1NormMatrix(self.V) vbMng(self, "DEL", "Done assembling energy matrix.", 20) def buildEnergyNormDualForm(self): """ Build sparse matrix (in CSR format) representative of dual scalar product without duality. """ vbMng(self, "INIT", "Assembling energy dual matrix.", 20) self.energyNormDualMatrix = augmentedHminus1NormMatrix(self.V, compressRank = self._energyDualNormCompress) vbMng(self, "DEL", "Done assembling energy dual matrix.", 20) def buildA(self): """Build terms of operator of linear system.""" ANone = any([A is None for A in self.As]) if not ANone: return self.nAs = 3 super().buildA() self._nAs = 2 I = eye(self.spacedim // 2) self.As[0] = bmat([[self.As[0], self._weight0 * self.As[1]], [None, I]], format = "csr") self.As[1] = bmat([[(1. - self._weight0) * self.As[1], self.As[2]], [- I, None]], format = "csr") self.thAs.pop() self.As.pop() def buildb(self): """Build terms of operator of linear system.""" bNone = any([b is None for b in self.bs]) if not bNone: return self.nbs = 1 dim = self.spacedim // 2 super().buildb() self.bs[0] = pad(self.bs[0], (0, dim), "constant") def plot(self, u, warping = None, is_state = False, name = "u", save = None, what = 'all', forceNewFile = True, saveFormat = "eps", saveDPI = 100, show = True, colorMap = "jet", fenplotArgs = {}, **figspecs): uh = u[: self.spacedim // 2] if is_state or self.isCEye else u return super().plot(uh, warping, is_state, name, save, what, forceNewFile, saveFormat, saveDPI, show, colorMap, fenplotArgs, **figspecs) def outParaview(self, u, warping = None, is_state = False, name = "u", filename = "out", time = 0., what = 'all', forceNewFile = True, folder = False, filePW = None): if not is_state and not self.isCEye: raise RROMPyException(("Cannot output to Paraview non-state " "object.")) return super().outParaview(u[: self.spacedim // 2], warping, is_state, name, filename, time, what, forceNewFile, folder, filePW) def outParaviewTimeDomain(self, u, omega, warping = None, is_state = False, timeFinal = None, periodResolution = 20, name = "u", filename = "out", forceNewFile = True, folder = False): if not is_state and not self.isCEye: raise RROMPyException(("Cannot output to Paraview non-state " "object.")) return super().outParaviewTimeDomain(u[: self.spacedim // 2], omega, warping, is_state, timeFinal, periodResolution, name, filename, forceNewFile, folder) diff --git a/rrompy/hfengines/fenics_engines/linear_elasticity_helmholtz_problem_engine_augmented.py b/rrompy/hfengines/fenics_engines/linear_elasticity_helmholtz_problem_engine_augmented.py index 75ae923..cff2ee1 100755 --- a/rrompy/hfengines/fenics_engines/linear_elasticity_helmholtz_problem_engine_augmented.py +++ b/rrompy/hfengines/fenics_engines/linear_elasticity_helmholtz_problem_engine_augmented.py @@ -1,280 +1,281 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import numpy as np from scipy.sparse import eye, bmat, block_diag +from collections.abc import Iterable from .linear_elasticity_helmholtz_problem_engine import ( LinearElasticityHelmholtzProblemEngine, LinearElasticityHelmholtzProblemEngineDamped) from rrompy.solver.fenics import (augmentedElasticNormMatrix, augmentedElasticDualNormMatrix) from rrompy.utilities.base import verbosityManager as vbMng from rrompy.utilities.exception_manager import RROMPyException from rrompy.parameter import parameterMap as pMap __all__ = ['LinearElasticityHelmholtzProblemEngineAugmented', 'LinearElasticityHelmholtzProblemEngineDampedAugmented'] class LinearElasticityHelmholtzProblemEngineAugmented( LinearElasticityHelmholtzProblemEngine): """ Solver for generic linear elasticity Helmholtz problems with parametric wavenumber. - div(lambda_ * div(u) * I + 2 * mu_ * epsilon(u)) - rho_ * mu * v = f in \Omega mu * u = v in \overline{\Omega} u = u0 on \Gamma_D \partial_nu = g1 on \Gamma_N \partial_nu + h u = g2 on \Gamma_R Attributes: verbosity: Verbosity level. BCManager: Boundary condition manager. V: Real vector FE space. u: Generic vector trial functions for variational form evaluation. v: Generic vector test functions for variational form evaluation. As: Scipy sparse array representation (in CSC format) of As. bs: Numpy array representation of bs. cs: Numpy array representation of cs. energyNormMatrix: Scipy sparse matrix representing inner product over V. energyNormDualMatrix: Scipy sparse matrix representing dual inner product between Riesz representers V-V. degree_threshold: Threshold for ufl expression interpolation degree. omega: Value of omega. lambda_: Value of lambda_. mu_: Value of mu_. forcingTerm: Value of f. DirichletDatum: Value of u0. NeumannDatum: Value of g1. RobinDatumG: Value of g2. RobinDatumH: Value of h. DirichletBoundary: Function handle to \Gamma_D. NeumannBoundary: Function handle to \Gamma_N. RobinBoundary: Function handle to \Gamma_R. ds: Boundary measure 2-tuple (resp. for Neumann and Robin boundaries). dsToBeSet: Whether ds needs to be set. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.parameterMap = pMap(1., self.npar) @property def spacedim(self): - if (hasattr(self, "bs") and hasattr(self.bs, "__len__") + if (hasattr(self, "bs") and isinstance(self.bs, Iterable) and self.bs[0] is not None): return len(self.bs[0]) return 2 * super().spacedim def buildEnergyNormForm(self): """ Build sparse matrix (in CSR format) representative of scalar product. """ vbMng(self, "INIT", "Assembling energy matrix.", 20) self.energyNormMatrix = augmentedElasticNormMatrix(self.V, self.lambda_[0], self.mu_[0]) vbMng(self, "DEL", "Done assembling energy matrix.", 20) def buildEnergyNormDualForm(self): """ Build sparse matrix (in CSR format) representative of dual scalar product without duality. """ vbMng(self, "INIT", "Assembling energy dual matrix.", 20) self.energyNormDualMatrix = augmentedElasticDualNormMatrix( self.V, self.lambda_[0], self.mu_[0], compressRank = self._energyDualNormCompress) vbMng(self, "DEL", "Done assembling energy dual matrix.", 20) def buildA(self): """Build terms of operator of linear system.""" ANone = any([A is None for A in self.As]) if not ANone: return self.nAs = 2 super().buildA() I = eye(self.spacedim // 2) self.As[0] = block_diag((self.As[0], I), format = "csr") self.As[1] = bmat([[None, self.As[1]], [- I, None]], format = "csr") def buildb(self): """Build terms of operator of linear system.""" bNone = any([b is None for b in self.bs]) if not bNone: return self.nbs = 1 dim = self.spacedim // 2 super().buildb() self.bs[0] = np.pad(self.bs[0], (0, dim), "constant") def plot(self, u, warping = None, is_state = False, name = "u", save = None, what = 'all', forceNewFile = True, saveFormat = "eps", saveDPI = 100, show = True, colorMap = "jet", fenplotArgs = {}, **figspecs): uh = u[: self.spacedim // 2] if is_state or self.isCEye else u return super().plot(uh, warping, is_state, name, save, what, forceNewFile, saveFormat, saveDPI, show, colorMap, fenplotArgs, **figspecs) def outParaview(self, u, warping = None, is_state = False, name = "u", filename = "out", time = 0., what = 'all', forceNewFile = True, folder = False, filePW = None): if not is_state and not self.isCEye: raise RROMPyException(("Cannot output to Paraview non-state " "object.")) return super().outParaview(u[: self.spacedim // 2], warping, is_state, name, filename, time, what, forceNewFile, folder, filePW) def outParaviewTimeDomain(self, u, omega, warping = None, is_state = False, timeFinal = None, periodResolution = 20, name = "u", filename = "out", forceNewFile = True, folder = False): if not is_state and not self.isCEye: raise RROMPyException(("Cannot output to Paraview non-state " "object.")) return super().outParaviewTimeDomain(u[: self.spacedim // 2], omega, warping, is_state, timeFinal, periodResolution, name, filename, forceNewFile, folder) class LinearElasticityHelmholtzProblemEngineDampedAugmented( LinearElasticityHelmholtzProblemEngineDamped): """ Solver for generic linear elasticity Helmholtz problems with parametric wavenumber. - div(lambda_ * div(u) * I + 2 * mu_ * epsilon(u)) - rho_ * (mu - i * eta) * v = f in \Omega mu * u = v in \overline{\Omega} u = u0 on \Gamma_D \partial_nu = g1 on \Gamma_N \partial_nu + h u = g2 on \Gamma_R Attributes: verbosity: Verbosity level. BCManager: Boundary condition manager. V: Real vector FE space. u: Generic vector trial functions for variational form evaluation. v: Generic vector test functions for variational form evaluation. As: Scipy sparse array representation (in CSC format) of As. bs: Numpy array representation of bs. cs: Numpy array representation of cs. energyNormMatrix: Scipy sparse matrix representing inner product over V. energyNormDualMatrix: Scipy sparse matrix representing dual inner product between Riesz representers V-V. degree_threshold: Threshold for ufl expression interpolation degree. omega: Value of omega. lambda_: Value of lambda_. mu_: Value of mu_. eta: Value of eta. forcingTerm: Value of f. DirichletDatum: Value of u0. NeumannDatum: Value of g1. RobinDatumG: Value of g2. RobinDatumH: Value of h. DirichletBoundary: Function handle to \Gamma_D. NeumannBoundary: Function handle to \Gamma_N. RobinBoundary: Function handle to \Gamma_R. ds: Boundary measure 2-tuple (resp. for Neumann and Robin boundaries). dsToBeSet: Whether ds needs to be set. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.nAs = 2 self._weight0 = 1. @property def spacedim(self): - if (hasattr(self, "bs") and hasattr(self.bs, "__len__") + if (hasattr(self, "bs") and isinstance(self.bs, Iterable) and self.bs[0] is not None): return len(self.bs[0]) return 2 * super().spacedim def buildEnergyNormForm(self): """ Build sparse matrix (in CSR format) representative of scalar product. """ vbMng(self, "INIT", "Assembling energy matrix.", 20) self.energyNormMatrix = augmentedElasticNormMatrix(self.V, self.lambda_[0], self.mu_[0]) vbMng(self, "DEL", "Done assembling energy matrix.", 20) def buildEnergyNormDualForm(self): """ Build sparse matrix (in CSR format) representative of dual scalar product without duality. """ vbMng(self, "INIT", "Assembling energy dual matrix.", 20) self.energyNormDualMatrix = augmentedElasticDualNormMatrix( self.V, self.lambda_[0], self.mu_[0], compressRank = self._energyDualNormCompress) vbMng(self, "DEL", "Done assembling energy dual matrix.", 20) def buildA(self): """Build terms of operator of linear system.""" ANone = any([A is None for A in self.As]) if not ANone: return self.nAs = 3 super().buildA() self._nAs = 2 I = eye(self.spacedim // 2) self.As[0] = bmat([[self.As[0], self._weight0 * self.As[1]], [None, I]], format = "csr") self.As[1] = bmat([[(1. - self._weight0) * self.As[1], self.As[2]], [- I, None]], format = "csr") self.thAs.pop() self.As.pop() def buildb(self): """Build terms of operator of linear system.""" bNone = any([b is None for b in self.bs]) if not bNone: return self.nbs = 1 dim = self.spacedim // 2 super().buildb() self.bs[0] = np.pad(self.bs[0], (0, dim), "constant") def plot(self, u, warping = None, is_state = False, name = "u", save = None, what = 'all', forceNewFile = True, saveFormat = "eps", saveDPI = 100, show = True, colorMap = "jet", fenplotArgs = {}, **figspecs): uh = u[: self.spacedim // 2] if is_state or self.isCEye else u return super().plot(uh, warping, is_state, name, save, what, forceNewFile, saveFormat, saveDPI, show, colorMap, fenplotArgs, **figspecs) def outParaview(self, u, warping = None, is_state = False, name = "u", filename = "out", time = 0., what = 'all', forceNewFile = True, folder = False, filePW = None): if not is_state and not self.isCEye: raise RROMPyException(("Cannot output to Paraview non-state " "object.")) return super().outParaview(u[: self.spacedim // 2], warping, is_state, name, filename, time, what, forceNewFile, folder, filePW) def outParaviewTimeDomain(self, u, omega, warping = None, is_state = False, timeFinal = None, periodResolution = 20, name = "u", filename = "out", forceNewFile = True, folder = False): if not is_state and not self.isCEye: raise RROMPyException(("Cannot output to Paraview non-state " "object.")) return super().outParaviewTimeDomain(u[: self.spacedim // 2], omega, warping, is_state, timeFinal, periodResolution, name, filename, forceNewFile, folder) diff --git a/rrompy/parameter/parameter_list.py b/rrompy/parameter/parameter_list.py index ec0ccb6..73821bb 100644 --- a/rrompy/parameter/parameter_list.py +++ b/rrompy/parameter/parameter_list.py @@ -1,234 +1,235 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import numpy as np +from collections.abc import Iterable from itertools import product as iterprod from copy import deepcopy as copy from rrompy.utilities.exception_manager import RROMPyException, RROMPyAssert from rrompy.utilities.base.types import Np2D __all__ = ['parameterList', 'emptyParameterList', 'checkParameterList'] def checkParameterList(mu, npar = None, check_if_single : bool = False, return_data : bool = False): if not isinstance(mu, (parameterList,)): mu = parameterList(mu, npar) else: if npar is not None: RROMPyAssert(mu.shape[1], npar, "Number of parameters") mu = copy(mu) if npar == 0: mu.reset((1, 0), mu.dtype) if return_data: mu = mu.data if check_if_single: return mu, len(mu) <= 1 return mu def checkParameter(mu, npar = None, return_data : bool = False): muL, wasPar = checkParameterList(mu, npar, True, return_data) if not wasPar: muL, wasPar = checkParameterList([mu], npar, True, return_data) if not wasPar: raise RROMPyException(("Only single parameter allowed. No " "parameter lists here.")) return muL def emptyParameterList(): return parameterList([[]]) def addMemberFromNumpyArray(self, fieldName): def objFunc(self, other): if not isinstance(other, (self.__class__,)): other = parameterList(other, self.shape[1]) return parameterList(getattr(np.ndarray, fieldName)(self.data, other.data)) setattr(self.__class__, fieldName, objFunc) def objIFunc(self, other): self.data = getattr(self.__class__, fieldName)(self, other).data setattr(self.__class__, "__i" + fieldName[2:], objIFunc) class parameterList: __all__ += [pre + post for pre, post in iterprod(["__", "__i"], ["add__", "sub__", "mul__", "div__", "truediv__", "floordiv__", "pow__"])] def __init__(self, data:Np2D, lengthCheck : int = None): - if not hasattr(data, "__len__"): data = [data] + if not isinstance(data, Iterable): data = [data] elif isinstance(data, (self.__class__,)): data = data.data elif isinstance(data, (tuple,)): data = list(data) if (isinstance(data, (list,)) and len(data) > 0 and isinstance(data[0], (tuple,))): data = [list(x) for x in data] self.data = np.array(data, ndmin = 1, copy = 1) if self.data.ndim == 1: self.data = self.data[:, None] if np.size(self.data) > 0: self.data = self.data.reshape((len(self), -1)) if self.shape[0] * self.shape[1] == 0: lenEff = 0 if lengthCheck is None else lengthCheck self.reset((0, lenEff), self.dtype) if lengthCheck is not None: if lengthCheck != 1 and self.shape == (lengthCheck, 1): self.data = self.data.T RROMPyAssert(self.shape[1], lengthCheck, "Number of parameters") for fieldName in ["__add__", "__sub__", "__mul__", "__div__", "__truediv__", "__floordiv__", "__pow__"]: addMemberFromNumpyArray(self, fieldName) def __len__(self): return self.shape[0] def __str__(self): if len(self) == 0: selfstr = "[]" elif len(self) <= 3: selfstr = "[{}]".format(" ".join([str(x) for x in self.data])) else: selfstr = "[{} ..({}).. {}]".format(self[0], len(self) - 2, self[-1]) return selfstr def __repr__(self): return repr(self.data) @property def shape(self): return self.data.shape @property def size(self): return self.data.size @property def re(self): return parameterList(np.real(self.data)) @property def im(self): return parameterList(np.imag(self.data)) @property def abs(self): return parameterList(np.abs(self.data)) @property def angle(self): return parameterList(np.angle(self.data)) @property def conj(self): return parameterList(np.conj(self.data)) @property def dtype(self): return self.data.dtype def __getitem__(self, key): return self.data[key] def __call__(self, key, idx = None): if idx is None: return self.data[:, key] return self[key, idx] def __setitem__(self, key, value): if isinstance(key, (tuple, list, np.ndarray)): RROMPyAssert(len(key), len(value), "Slice length") for k, val in zip(key, value): self[k] = val else: self.data[key] = value def __eq__(self, other): if not hasattr(other, "shape") or self.shape != other.shape: return False if isinstance(other, self.__class__): other = other.data return np.allclose(self.data, other) def __contains__(self, item): return next((x for x in self if np.allclose(x[0], item)), -1) != -1 def __iter__(self): return iter([parameterList([x]) for x in self.data]) def __copy__(self): return parameterList(self.data) def __deepcopy__(self, memo): return parameterList(copy(self.data, memo)) def __neg__(self): return parameterList(-self.data) def __pos__(self): return copy(self) def reset(self, size, dtype = complex): self.data = np.empty(size, dtype = dtype) self.data[:] = np.nan def insert(self, items, idx = None): if isinstance(items, self.__class__): items = items.data else: items = np.array(items, ndmin = 2) if len(self) == 0: self.data = parameterList(items).data elif idx is None: self.data = np.append(self.data, items, axis = 0) else: self.data = np.insert(self.data, idx, items, axis = 0) def append(self, items): self.insert(items) def pop(self, idx = -1): self.data = np.delete(self.data, idx, axis = 0) def find(self, item): if len(self) == 0: return None return next((j for j in range(len(self)) if np.allclose(self[j], item)), None) def findall(self, item): if len(self) == 0: return [] return [j for j in range(len(self)) if np.allclose(self[j], item)] def sort(self, overwrite = False, *args, **kwargs): dataT = np.array([tuple(x[0]) for x in self], dtype = [(str(j), self.dtype) for j in range(self.shape[1])]) sortedP = parameterList([list(x) for x in np.sort(dataT, *args, **kwargs)]) if overwrite: self.data = sortedP.data return sortedP def unique(self, overwrite = False, *args, **kwargs): dataT = np.array([tuple(x[0]) for x in self], dtype = [(str(j), self.dtype) for j in range(self.shape[1])]) uniqueT = np.unique(dataT, *args, **kwargs) if isinstance(uniqueT, (tuple,)): extraT = uniqueT[1:] uniqueT = uniqueT[0] else: extraT = () uniqueP = parameterList([list(x) for x in uniqueT]) if overwrite: self.data = uniqueP.data uniqueP = (uniqueP,) + extraT if len(uniqueP) == 1: return uniqueP[0] return uniqueP diff --git a/rrompy/parameter/parameter_sampling/shape/generic_shape_sampler.py b/rrompy/parameter/parameter_sampling/shape/generic_shape_sampler.py index c46ed25..8feebb3 100644 --- a/rrompy/parameter/parameter_sampling/shape/generic_shape_sampler.py +++ b/rrompy/parameter/parameter_sampling/shape/generic_shape_sampler.py @@ -1,48 +1,49 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import numpy as np +from collections.abc import Iterable from rrompy.parameter.parameter_sampling.generic_sampler import GenericSampler from rrompy.utilities.base.types import List, DictAny, paramList from rrompy.utilities.exception_manager import RROMPyAssert __all__ = ['GenericShapeSampler'] class GenericShapeSampler(GenericSampler): """Generator of sample points on boxes or ellipses.""" def __init__(self, lims:paramList, axisRatios : List[float] = None, parameterMap : DictAny = 1.): super().__init__(lims = lims, parameterMap = parameterMap) self.axisRatios = axisRatios def normalFoci(self, d : int = 0): focus = (1. + 0.j - self.axisRatios[d] ** 2.) ** .5 return [- focus, focus] @property def axisRatios(self): """Value of axisRatios.""" return self._axisRatios @axisRatios.setter def axisRatios(self, axisRatios): if axisRatios is None: axisRatios = [1.] * self.npar - if not hasattr(axisRatios, "__len__"): axisRatios = [axisRatios] + if not isinstance(axisRatios, Iterable): axisRatios = [axisRatios] RROMPyAssert(self.npar, len(axisRatios), "Number of axis ratios terms") self._axisRatios = [np.abs(x) for x in axisRatios] diff --git a/rrompy/reduction_methods/base/generic_approximant.py b/rrompy/reduction_methods/base/generic_approximant.py index 871fa82..c6970dd 100644 --- a/rrompy/reduction_methods/base/generic_approximant.py +++ b/rrompy/reduction_methods/base/generic_approximant.py @@ -1,921 +1,922 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from abc import abstractmethod import numpy as np +from collections.abc import Iterable from itertools import product as iterprod from copy import deepcopy as copy from os import remove as osrm from rrompy.sampling import SamplingEngineStandard, SamplingEngineStandardPOD from rrompy.utilities.base.types import (Np1D, DictAny, HFEng, List, Tuple, ListAny, strLst, paramVal, paramList, sampList) from rrompy.utilities.base.data_structures import purgeDict, getNewFilename from rrompy.utilities.base import verbosityManager as vbMng from rrompy.utilities.exception_manager import (RROMPyException, RROMPyAssert, RROMPy_READY, RROMPy_FRAGILE) from rrompy.utilities.base.pickle_utilities import pickleDump, pickleLoad from rrompy.parameter import (emptyParameterList, checkParameter, checkParameterList) from rrompy.sampling import sampleList, emptySampleList from rrompy.utilities.parallel import (bcast, masterCore, listGather, listScatter) __all__ = ['GenericApproximant'] def addNormFieldToClass(self, fieldName): def objFunc(self, mu:paramList, *args, **kwargs) -> Np1D: uV = getattr(self.__class__, "get" + fieldName)(self, mu) kwargs["is_state"] = False val = self.HFEngine.norm(uV, *args, **kwargs) return val setattr(self.__class__, "norm" + fieldName, objFunc) def addNormDualFieldToClass(self, fieldName): def objFunc(self, mu:paramList, *args, **kwargs) -> Np1D: uV = getattr(self.__class__, "get" + fieldName)(self, mu) kwargs["is_state"] = True if "dual" not in kwargs.keys(): kwargs["dual"] = True val = self.HFEngine.norm(uV, *args, **kwargs) return val setattr(self.__class__, "norm" + fieldName, objFunc) def addPlotFieldToClass(self, fieldName): def objFunc(self, mu:paramList, *args, **kwargs): uV = getattr(self.__class__, "get" + fieldName)(self, mu) uV = listScatter(uV)[0].T filesOut = [] if len(uV) > 0: if "name" in kwargs.keys(): nameBase = copy(kwargs["name"]) for j, u in enumerate(uV): if "name" in kwargs.keys(): kwargs["name"] = nameBase + str(j) filesOut += [self.HFEngine.plot(u, *args, **kwargs)] if "name" in kwargs.keys(): kwargs["name"] = nameBase filesOut = listGather(filesOut) if filesOut[0] is None: return None return filesOut setattr(self.__class__, "plot" + fieldName, objFunc) def addPlotDualFieldToClass(self, fieldName): def objFunc(self, mu:paramList, *args, **kwargs): uV = getattr(self.__class__, "get" + fieldName)(self, mu) uV = listScatter(uV)[0].T filesOut = [] if len(uV) > 0: if "name" in kwargs.keys(): nameBase = copy(kwargs["name"]) for j, u in enumerate(uV): if "name" in kwargs.keys(): kwargs["name"] = nameBase + str(j) filesOut += [self.HFEngine.plot(u, *args, **kwargs)] if "name" in kwargs.keys(): kwargs["name"] = nameBase filesOut = listGather(filesOut) if filesOut[0] is None: return None return filesOut setattr(self.__class__, "plot" + fieldName, objFunc) def addOutParaviewFieldToClass(self, fieldName): def objFunc(self, mu:paramVal, *args, **kwargs): if not hasattr(self.HFEngine, "outParaview"): raise RROMPyException(("High fidelity engine cannot output to " "Paraview.")) uV = getattr(self.__class__, "get" + fieldName)(self, mu) uV = listScatter(uV)[0].T filesOut = [] if len(uV) > 0: if "name" in kwargs.keys(): nameBase = copy(kwargs["name"]) for j, u in enumerate(uV): if "name" in kwargs.keys(): kwargs["name"] = nameBase + str(j) filesOut += [self.HFEngine.outParaview(u, *args, **kwargs)] if "name" in kwargs.keys(): kwargs["name"] = nameBase filesOut = listGather(filesOut) if filesOut[0] is None: return None return filesOut setattr(self.__class__, "outParaview" + fieldName, objFunc) def addOutParaviewTimeDomainFieldToClass(self, fieldName): def objFunc(self, mu:paramVal, *args, **kwargs): if not hasattr(self.HFEngine, "outParaviewTimeDomain"): raise RROMPyException(("High fidelity engine cannot output to " "Paraview.")) uV = getattr(self.__class__, "get" + fieldName)(self, mu) uV = listScatter(uV)[0].T filesOut = [] if len(uV) > 0: omega = args.pop(0) if len(args) > 0 else np.real(mu) if "name" in kwargs.keys(): nameBase = copy(kwargs["name"]) filesOut = [] for j, u in enumerate(uV): if "name" in kwargs.keys(): kwargs["name"] = nameBase + str(j) filesOut += [self.HFEngine.outParaviewTimeDomain(u, omega, *args, **kwargs)] if "name" in kwargs.keys(): kwargs["name"] = nameBase filesOut = listGather(filesOut) if filesOut[0] is None: return None return filesOut setattr(self.__class__, "outParaviewTimeDomain" + fieldName, objFunc) class GenericApproximant: """ ABSTRACT ROM approximant computation for parametric problems. Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': whether to compute POD of snapshots; defaults to True; - 'scaleFactorDer': scaling factors for derivative computation; defaults to 'AUTO'; - 'S': total number of samples current approximant relies upon. Defaults to empty dict. approx_state(optional): Whether to approximate state. Defaults to False. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. trainedModel: Trained model evaluator. mu0: Default parameter. approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList{Soft,Critical}. parameterListSoft: Recognized keys of soft approximant parameters: - 'POD': whether to compute POD of snapshots; - 'scaleFactorDer': scaling factors for derivative computation. parameterListCritical: Recognized keys of critical approximant parameters: - 'S': total number of samples current approximant relies upon. approx_state: Whether to approximate state. verbosity: Verbosity level. POD: Whether to compute POD of snapshots. scaleFactorDer: Scaling factors for derivative computation. S: Number of solution snapshots over which current approximant is based upon. samplingEngine: Sampling engine. uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as sampleList. lastSolvedHF: Parameter(s) corresponding to last computed high fidelity solution(s) as parameterList. uApproxReduced: Reduced approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApproxReduced: Parameter(s) corresponding to last computed reduced approximate solution(s) as parameterList. uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApprox: Parameter(s) corresponding to last computed approximate solution(s) as parameterList. """ __all__ += [ftype + dtype for ftype, dtype in iterprod( ["norm", "plot", "outParaview", "outParaviewTimeDomain"], ["HF", "RHS", "Approx", "Res", "Err"])] def __init__(self, HFEngine:HFEng, mu0 : paramVal = None, approxParameters : DictAny = {}, approx_state : bool = False, verbosity : int = 10, timestamp : bool = True): self._preInit() self._mode = RROMPy_READY self.approx_state = approx_state self.verbosity = verbosity self.timestamp = timestamp vbMng(self, "INIT", "Initializing engine of type {}.".format(self.name()), 10) self._HFEngine = HFEngine self.trainedModel = None self.lastSolvedHF = emptyParameterList() self.uHF = emptySampleList() self._addParametersToList(["POD", "scaleFactorDer"], [True, "AUTO"], ["S"], [1.]) if mu0 is None: if hasattr(self.HFEngine, "mu0"): self.mu0 = checkParameter(self.HFEngine.mu0) else: raise RROMPyException(("Center of approximation cannot be " "inferred from HF engine. Parameter " "required")) else: self.mu0 = checkParameter(mu0, self.HFEngine.npar) self.resetSamples() self.approxParameters = approxParameters self._postInit() ### add norm{HF,Err} methods """ Compute norm of * at arbitrary parameter. Args: mu: Target parameter. Returns: Target norm of *. """ for objName in ["HF", "Err"]: addNormFieldToClass(self, objName) ### add norm{RHS,Res} methods """ Compute norm of * at arbitrary parameter. Args: mu: Target parameter. Returns: Target norm of *. """ for objName in ["RHS", "Res"]: addNormDualFieldToClass(self, objName) ### add plot{HF,Approx,Err} methods """ Do some nice plots of * at arbitrary parameter. Args: mu: Target parameter. name(optional): Name to be shown as title of the plots. Defaults to 'u'. what(optional): Which plots to do. If list, can contain 'ABS', 'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard 'ALL'. Defaults to 'ALL'. save(optional): Where to save plot(s). Defaults to None, i.e. no saving. saveFormat(optional): Format for saved plot(s). Defaults to "eps". saveDPI(optional): DPI for saved plot(s). Defaults to 100. show(optional): Whether to show figure. Defaults to True. figspecs(optional key args): Optional arguments for matplotlib figure creation. """ for objName in ["HF", "Approx", "Err"]: addPlotFieldToClass(self, objName) ### add plot{RHS,Res} methods """ Do some nice plots of * at arbitrary parameter. Args: mu: Target parameter. name(optional): Name to be shown as title of the plots. Defaults to 'u'. what(optional): Which plots to do. If list, can contain 'ABS', 'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard 'ALL'. Defaults to 'ALL'. save(optional): Where to save plot(s). Defaults to None, i.e. no saving. saveFormat(optional): Format for saved plot(s). Defaults to "eps". saveDPI(optional): DPI for saved plot(s). Defaults to 100. show(optional): Whether to show figure. Defaults to True. figspecs(optional key args): Optional arguments for matplotlib figure creation. """ for objName in ["RHS", "Res"]: addPlotDualFieldToClass(self, objName) ### add outParaview{HF,RHS,Approx,Res,Err} methods """ Output * to ParaView file. Args: mu: Target parameter. name(optional): Base name to be used for data output. filename(optional): Name of output file. time(optional): Timestamp. what(optional): Which plots to do. If list, can contain 'MESH', 'ABS', 'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard 'ALL'. Defaults to 'ALL'. forceNewFile(optional): Whether to create new output file. filePW(optional): Fenics File entity (for time series). """ for objName in ["HF", "RHS", "Approx", "Res", "Err"]: addOutParaviewFieldToClass(self, objName) ### add outParaviewTimeDomain{HF,RHS,Approx,Res,Err} methods """ Output * to ParaView file, converted to time domain. Args: mu: Target parameter. omega(optional): frequency. timeFinal(optional): final time of simulation. periodResolution(optional): number of time steps per period. name(optional): Base name to be used for data output. filename(optional): Name of output file. forceNewFile(optional): Whether to create new output file. """ for objName in ["HF", "RHS", "Approx", "Res", "Err"]: addOutParaviewTimeDomainFieldToClass(self, objName) def _preInit(self): if not hasattr(self, "depth"): self.depth = 0 else: self.depth += 1 @property def tModelType(self): raise RROMPyException("No trainedModel type assigned.") def initializeModelData(self, datadict): from .trained_model.trained_model_data import TrainedModelData return (TrainedModelData(datadict["mu0"], datadict["mus"], datadict.pop("projMat"), datadict["scaleFactor"], datadict.pop("parameterMap")), ["mu0", "scaleFactor", "mus"]) @property def parameterList(self): """Value of parameterListSoft + parameterListCritical.""" return self.parameterListSoft + self.parameterListCritical def _addParametersToList(self, whatSoft : strLst = [], defaultSoft : ListAny = [], whatCritical : strLst = [], defaultCritical : ListAny = [], toBeExcluded : strLst = []): if not hasattr(self, "parameterToBeExcluded"): self.parameterToBeExcluded = [] self.parameterToBeExcluded = toBeExcluded + self.parameterToBeExcluded if not hasattr(self, "parameterListSoft"): self.parameterListSoft = [] if not hasattr(self, "parameterDefaultSoft"): self.parameterDefaultSoft = {} if not hasattr(self, "parameterListCritical"): self.parameterListCritical = [] if not hasattr(self, "parameterDefaultCritical"): self.parameterDefaultCritical = {} for j, what in enumerate(whatSoft): if what not in self.parameterToBeExcluded: self.parameterListSoft = [what] + self.parameterListSoft self.parameterDefaultSoft[what] = defaultSoft[j] for j, what in enumerate(whatCritical): if what not in self.parameterToBeExcluded: self.parameterListCritical = ([what] + self.parameterListCritical) self.parameterDefaultCritical[what] = defaultCritical[j] def _postInit(self): if self.depth == 0: vbMng(self, "DEL", "Done initializing.", 10) del self.depth else: self.depth -= 1 def name(self) -> str: return self.__class__.__name__ def __str__(self) -> str: return self.name() def __repr__(self) -> str: return self.__str__() + " at " + hex(id(self)) def setupSampling(self): """Setup sampling engine.""" RROMPyAssert(self._mode, message = "Cannot setup sampling engine.") if not hasattr(self, "_POD") or self._POD is None: return if self.POD: SamplingEngine = SamplingEngineStandardPOD else: SamplingEngine = SamplingEngineStandard self.samplingEngine = SamplingEngine(self.HFEngine, sample_state = self.approx_state, verbosity = self.verbosity) self.resetSamples() @property def HFEngine(self): """Value of HFEngine.""" return self._HFEngine @HFEngine.setter def HFEngine(self, HFEngine): raise RROMPyException("Cannot change HFEngine.") @property def mu0(self): """Value of mu0.""" return self._mu0 @mu0.setter def mu0(self, mu0): mu0 = checkParameter(mu0) if not hasattr(self, "_mu0") or mu0 != self.mu0: self.resetSamples() self._mu0 = mu0 @property def npar(self): """Number of parameters.""" return self.mu0.shape[1] def checkParameterList(self, mu:paramList, check_if_single : bool = False) -> paramList: return checkParameterList(mu, self.npar, check_if_single) def mapParameterList(self, *args, **kwargs): return self.HFEngine.mapParameterList(*args, **kwargs) @property def approxParameters(self): """Value of approximant parameters.""" return self._approxParameters @approxParameters.setter def approxParameters(self, approxParams): if not hasattr(self, "approxParameters"): self._approxParameters = {} approxParameters = purgeDict(approxParams, self.parameterList, dictname = self.name() + ".approxParameters", baselevel = 1) keyList = list(approxParameters.keys()) for key in self.parameterListCritical: if key in keyList: setattr(self, "_" + key, self.parameterDefaultCritical[key]) for key in self.parameterListSoft: if key in keyList: setattr(self, "_" + key, self.parameterDefaultSoft[key]) fragile = False for key in self.parameterListCritical: if key in keyList: val = approxParameters[key] else: val = getattr(self, "_" + key, None) if val is None: fragile = True val = self.parameterDefaultCritical[key] if self._mode == RROMPy_FRAGILE: setattr(self, "_" + key, val) self.approxParameters[key] = val else: getattr(self.__class__, key, None).fset(self, val) for key in self.parameterListSoft: if key in keyList: val = approxParameters[key] else: val = getattr(self, "_" + key, None) if val is None: val = self.parameterDefaultSoft[key] if self._mode == RROMPy_FRAGILE: setattr(self, "_" + key, val) self.approxParameters[key] = val else: getattr(self.__class__, key, None).fset(self, val) if fragile: self._mode = RROMPy_FRAGILE @property def POD(self): """Value of POD.""" return self._POD @POD.setter def POD(self, POD): if hasattr(self, "_POD"): PODold = self.POD else: PODold = -1 self._POD = POD self._approxParameters["POD"] = self.POD if PODold != self.POD: self.samplingEngine = None self.resetSamples() @property def scaleFactorDer(self): """Value of scaleFactorDer.""" if self._scaleFactorDer == "NONE": return 1. if self._scaleFactorDer == "AUTO": return self.scaleFactor return self._scaleFactorDer @scaleFactorDer.setter def scaleFactorDer(self, scaleFactorDer): if isinstance(scaleFactorDer, (str,)): scaleFactorDer = scaleFactorDer.upper() - elif hasattr(scaleFactorDer, "__len__"): + elif isinstance(scaleFactorDer, Iterable): scaleFactorDer = list(scaleFactorDer) self._scaleFactorDer = scaleFactorDer self._approxParameters["scaleFactorDer"] = self._scaleFactorDer @property def scaleFactorRel(self): """Value of scaleFactorDer / scaleFactor.""" if self._scaleFactorDer == "AUTO": return None try: return np.divide(self.scaleFactorDer, self.scaleFactor) except: raise RROMPyException(("Error in computation of relative scaling " "factor. Make sure that scaleFactor is " "properly initialized.")) from None @property def approx_state(self): """Value of approx_state.""" return self._approx_state @approx_state.setter def approx_state(self, approx_state): if hasattr(self, "_approx_state"): approx_stateold = self.approx_state else: approx_stateold = -1 self._approx_state = approx_state if approx_stateold != self.approx_state: self.resetSamples() @property def S(self): """Value of S.""" return self._S @S.setter def S(self, S): if S <= 0: raise RROMPyException("S must be positive.") if hasattr(self, "_S") and self._S is not None: Sold = self.S else: Sold = -1 self._S = S self._approxParameters["S"] = self.S if Sold != self.S: self.resetSamples() @property def trainedModel(self): """Value of trainedModel.""" return self._trainedModel @trainedModel.setter def trainedModel(self, trainedModel): self._trainedModel = trainedModel if self._trainedModel is not None: self._trainedModel.reset() self.lastSolvedApproxReduced = emptyParameterList() self.lastSolvedApprox = emptyParameterList() self.uApproxReduced = emptySampleList() self.uApprox = emptySampleList() def resetSamples(self): if hasattr(self, "samplingEngine") and self.samplingEngine is not None: self.samplingEngine.resetHistory() else: self.setupSampling() self._mode = RROMPy_READY def plotSamples(self, *args, **kwargs) -> List[str]: """ Do some nice plots of the samples. Returns: Output filenames. """ RROMPyAssert(self._mode, message = "Cannot plot samples.") return self.samplingEngine.plotSamples(*args, **kwargs) def outParaviewSamples(self, *args, **kwargs) -> List[str]: """ Output samples to ParaView file. Returns: Output filenames. """ RROMPyAssert(self._mode, message = "Cannot output samples.") return self.samplingEngine.outParaviewSamples(*args, **kwargs) def outParaviewTimeDomainSamples(self, *args, **kwargs) -> List[str]: """ Output samples to ParaView file, converted to time domain. Returns: Output filenames. """ RROMPyAssert(self._mode, message = "Cannot output samples.") return self.samplingEngine.outParaviewTimeDomainSamples(*args, **kwargs) def setTrainedModel(self, model): """Deepcopy approximation from trained model.""" if hasattr(model, "storeTrainedModel"): verb = model.verbosity model.verbosity = 0 fileOut = model.storeTrainedModel() model.verbosity = verb else: try: fileOut = getNewFilename("trained_model", "pkl") pickleDump(model.data.__dict__, fileOut) except: raise RROMPyException(("Failed to store model data. Parameter " "model must have either " "storeTrainedModel or " "data.__dict__ properties.")) from None self.loadTrainedModel(fileOut) osrm(fileOut) @abstractmethod def setupApprox(self) -> int: """ Setup approximant. (ABSTRACT) Any specialization should include something like self.trainedModel = ... self.trainedModel.data = ... self.trainedModel.data.approxParameters = copy( self.approxParameters) Returns > 0 if error was encountered, < 0 if no computation was necessary. """ if self.checkComputedApprox(): return -1 RROMPyAssert(self._mode, message = "Cannot setup approximant.") vbMng(self, "INIT", "Setting up {}.". format(self.name()), 5) pass vbMng(self, "DEL", "Done setting up approximant.", 5) return 0 def checkComputedApprox(self) -> bool: """ Check if setup of new approximant is not needed. Returns: True if new setup is not needed. False otherwise. """ return self._mode == RROMPy_FRAGILE or (self.trainedModel is not None and self.trainedModel.data.approxParameters == self.approxParameters and len(self.mus) == len(self.trainedModel.data.mus)) def _pruneBeforeEval(self, mu:paramList, field:str, append:bool, prune:bool) -> Tuple[paramList, Np1D, Np1D, bool]: mu = self.checkParameterList(mu) idx = np.empty(len(mu), dtype = np.int) if prune: jExtra = np.zeros(len(mu), dtype = bool) muExtra = emptyParameterList() lastSolvedMus = getattr(self, "lastSolved" + field) if (len(mu) > 0 and len(mu) == len(lastSolvedMus) and mu == lastSolvedMus): idx = np.arange(len(mu), dtype = np.int) return muExtra, jExtra, idx, True muKeep = copy(muExtra) for j in range(len(mu)): jPos = lastSolvedMus.find(mu[j]) if jPos is not None: idx[j] = jPos muKeep.append(mu[j]) else: jExtra[j] = True muExtra.append(mu[j]) if len(muKeep) > 0 and not append: lastSolvedu = getattr(self, "u" + field) idx[~jExtra] = getattr(self.__class__, "set" + field)(self, muKeep, lastSolvedu[idx[~jExtra]], append) append = True else: jExtra = np.ones(len(mu), dtype = bool) muExtra = mu return muExtra, jExtra, idx, append def _setObject(self, mu:paramList, field:str, object:sampList, append:bool) -> List[int]: newMus = self.checkParameterList(mu) newObj = sampleList(object) if append: getattr(self, "lastSolved" + field).append(newMus) getattr(self, "u" + field).append(newObj) Ltot = len(getattr(self, "u" + field)) return list(range(Ltot - len(newObj), Ltot)) setattr(self, "lastSolved" + field, copy(newMus)) setattr(self, "u" + field, copy(newObj)) return list(range(len(getattr(self, "u" + field)))) def setHF(self, muHF:paramList, uHF:sampleList, append : bool = False) -> List[int]: """Assign high fidelity solution.""" return self._setObject(muHF, "HF", uHF, append) def evalHF(self, mu:paramList, append : bool = False, prune : bool = True) -> List[int]: """ Find high fidelity solution with original parameters and arbitrary parameter. Args: mu: Target parameter. append(optional): Whether to append new HF solutions to old ones. prune(optional): Whether to remove duplicates of already appearing HF solutions. """ muExtra, jExtra, idx, append = self._pruneBeforeEval(mu, "HF", append, prune) if len(muExtra) > 0: vbMng(self, "INIT", "Solving HF model for mu = {}.".format(mu), 15) newuHFs = self.HFEngine.solve(muExtra) vbMng(self, "DEL", "Done solving HF model.", 15) idx[jExtra] = self.setHF(muExtra, newuHFs, append) return list(idx) def setApproxReduced(self, muApproxR:paramList, uApproxR:sampleList, append : bool = False) -> List[int]: """Assign high fidelity solution.""" return self._setObject(muApproxR, "ApproxReduced", uApproxR, append) def evalApproxReduced(self, mu:paramList, append : bool = False, prune : bool = True) -> List[int]: """ Evaluate reduced representation of approximant at arbitrary parameter. Args: mu: Target parameter. append(optional): Whether to append new HF solutions to old ones. prune(optional): Whether to remove duplicates of already appearing HF solutions. """ self.setupApprox() muExtra, jExtra, idx, append = self._pruneBeforeEval(mu, "ApproxReduced", append, prune) if len(muExtra) > 0: newuApproxs = self.trainedModel.getApproxReduced(muExtra) idx[jExtra] = self.setApproxReduced(muExtra, newuApproxs, append) return list(idx) def setApprox(self, muApprox:paramList, uApprox:sampleList, append : bool = False) -> List[int]: """Assign high fidelity solution.""" return self._setObject(muApprox, "Approx", uApprox, append) def evalApprox(self, mu:paramList, append : bool = False, prune : bool = True) -> List[int]: """ Evaluate approximant at arbitrary parameter. Args: mu: Target parameter. append(optional): Whether to append new HF solutions to old ones. prune(optional): Whether to remove duplicates of already appearing HF solutions. """ self.setupApprox() muExtra, jExtra, idx, append = self._pruneBeforeEval(mu, "Approx", append, prune) if len(muExtra) > 0: newuApproxs = self.trainedModel.getApprox(muExtra) idx[jExtra] = self.setApprox(muExtra, newuApproxs, append) return list(idx) def getHF(self, mu:paramList, append : bool = False, prune : bool = True) -> sampList: """ Get HF solution at arbitrary parameter. Args: mu: Target parameter. Returns: HFsolution. """ mu = self.checkParameterList(mu) idx = self.evalHF(mu, append = append, prune = prune) return self.uHF(idx) def getRHS(self, mu:paramList) -> sampList: """ Get linear system RHS at arbitrary parameter. Args: mu: Target parameter. Returns: Linear system RHS. """ return self.HFEngine.residual(mu, None) def getApproxReduced(self, mu:paramList, append : bool = False, prune : bool = True) -> sampList: """ Get approximant at arbitrary parameter. Args: mu: Target parameter. Returns: Reduced approximant. """ mu = self.checkParameterList(mu) idx = self.evalApproxReduced(mu, append = append, prune = prune) return self.uApproxReduced(idx) def getApprox(self, mu:paramList, append : bool = False, prune : bool = True) -> sampList: """ Get approximant at arbitrary parameter. Args: mu: Target parameter. Returns: Approximant. """ mu = self.checkParameterList(mu) idx = self.evalApprox(mu, append = append, prune = prune) return self.uApprox(idx) def getRes(self, mu:paramList) -> sampList: """ Get residual at arbitrary parameter. Args: mu: Target parameter. Returns: Approximant residual. """ if not self.HFEngine.isCEye: raise RROMPyException(("Residual of solution with non-scalar C " "not computable.")) return self.HFEngine.residual(mu, self.getApprox(mu) / self.HFEngine.C) def getErr(self, mu:paramList, append : bool = False, prune : bool = True) -> sampList: """ Get error at arbitrary parameter. Args: mu: Target parameter. Returns: Approximant error. """ return (self.getApprox(mu, append = append, prune =prune) - self.getHF(mu, append = append, prune = prune)) def normApprox(self, mu:paramList) -> float: """ Compute norm of approximant at arbitrary parameter. Args: mu: Target parameter. Returns: Target norm of approximant. """ if not (self.POD and self.HFEngine.isCEye): return self.HFEngine.norm(self.getApprox(mu), is_state = False) return np.linalg.norm(self.HFEngine.applyC( self.getApproxReduced(mu).data), axis = 0) def recompressApprox(self, collapse : bool = False, tol : float = 0.): """Recompress approximant.""" self.setupApprox() vbMng(self, "INIT", "Recompressing approximant.", 20) self.trainedModel.compress(collapse, tol, self.HFEngine, self.approx_state) vbMng(self, "DEL", "Done recompressing approximant.", 20) def getPoles(self, *args, **kwargs) -> Np1D: """ Obtain approximant poles. Returns: Numpy complex vector of poles. """ self.setupApprox() vbMng(self, "INIT", "Computing poles of model.", 20) poles = self.trainedModel.getPoles(*args, **kwargs) vbMng(self, "DEL", "Done computing poles.", 20) return poles def storeSamples(self, filenameBase : str = "samples", forceNewFile : bool = True) -> str: """Store samples to file.""" filename = filenameBase + "_" + self.name() if forceNewFile: filename = getNewFilename(filename, "pkl")[: - 4] return self.samplingEngine.store(filename, False) def storeTrainedModel(self, filenameBase : str = "trained_model", forceNewFile : bool = True) -> str: """Store trained reduced model to file.""" self.setupApprox() filename = None if masterCore(): vbMng(self, "INIT", "Storing trained model to file.", 20) if forceNewFile: filename = getNewFilename(filenameBase, "pkl") else: filename = "{}.pkl".format(filenameBase) pickleDump(self.trainedModel.data.__dict__, filename) vbMng(self, "DEL", "Done storing trained model.", 20) filename = bcast(filename) return filename def loadTrainedModel(self, filename:str): """Load trained reduced model from file.""" vbMng(self, "INIT", "Loading pre-trained model from file.", 20) datadict = pickleLoad(filename) self.mu0 = datadict["mu0"] self.scaleFactor = datadict["scaleFactor"] self.mus = datadict["mus"] self.trainedModel = self.tModelType() self.trainedModel.verbosity = self.verbosity self.trainedModel.timestamp = self.timestamp data, selfkeys = self.initializeModelData(datadict) for key in selfkeys: setattr(self, key, datadict.pop(key)) approxParameters = datadict.pop("approxParameters") data.approxParameters = copy(approxParameters) for apkey in data.approxParameters.keys(): self._approxParameters[apkey] = approxParameters.pop(apkey) setattr(self, "_" + apkey, self._approxParameters[apkey]) for key in datadict: setattr(data, key, datadict[key]) self.trainedModel.data = data self._mode = RROMPy_FRAGILE vbMng(self, "DEL", "Done loading pre-trained model.", 20) diff --git a/rrompy/reduction_methods/pivoted/generic_pivoted_approximant.py b/rrompy/reduction_methods/pivoted/generic_pivoted_approximant.py index fa3530d..9f42ec9 100644 --- a/rrompy/reduction_methods/pivoted/generic_pivoted_approximant.py +++ b/rrompy/reduction_methods/pivoted/generic_pivoted_approximant.py @@ -1,757 +1,758 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from os import mkdir, remove, rmdir import numpy as np +from collections.abc import Iterable from copy import deepcopy as copy from rrompy.reduction_methods.base.generic_approximant import ( GenericApproximant) from rrompy.utilities.base.data_structures import purgeDict, getNewFilename from rrompy.sampling import SamplingEngineStandard, SamplingEngineStandardPOD from rrompy.utilities.poly_fitting.polynomial import polybases as ppb from rrompy.utilities.poly_fitting.radial_basis import polybases as rbpb from rrompy.utilities.poly_fitting.piecewise_linear import sparsekinds as sk from rrompy.utilities.base.types import Np2D, paramList, List, ListAny from rrompy.utilities.base import verbosityManager as vbMng from rrompy.utilities.numerical.degree import reduceDegreeN from rrompy.utilities.exception_manager import (RROMPyException, RROMPyAssert, RROMPyWarning) from rrompy.parameter import checkParameterList from rrompy.utilities.parallel import poolRank, bcast __all__ = ['GenericPivotedApproximantNoMatch', 'GenericPivotedApproximant'] class GenericPivotedApproximantBase(GenericApproximant): def __init__(self, directionPivot:ListAny, *args, storeAllSamples : bool = False, **kwargs): self._preInit() if len(directionPivot) > 1: raise RROMPyException(("Exactly 1 pivot parameter allowed in pole " "matching.")) from rrompy.parameter.parameter_sampling import (EmptySampler as ES, SparseGridSampler as SG) self._addParametersToList(["radialDirectionalWeightsMarginal"], [1.], ["samplerPivot", "SMarginal", "samplerMarginal"], [ES(), 1, SG([[-1.], [1.]])], toBeExcluded = ["sampler"]) self._directionPivot = directionPivot self.storeAllSamples = storeAllSamples super().__init__(*args, **kwargs) self._postInit() def setupSampling(self): """Setup sampling engine.""" RROMPyAssert(self._mode, message = "Cannot setup sampling engine.") if not hasattr(self, "_POD") or self._POD is None: return if self.POD: SamplingEngine = SamplingEngineStandardPOD else: SamplingEngine = SamplingEngineStandard self.samplingEngine = SamplingEngine(self.HFEngine, sample_state = self.approx_state, verbosity = self.verbosity) def initializeModelData(self, datadict): if "directionPivot" in datadict.keys(): from .trained_model.trained_model_pivoted_data import ( TrainedModelPivotedData) return (TrainedModelPivotedData(datadict["mu0"], datadict["mus"], datadict.pop("projMat"), datadict["scaleFactor"], datadict.pop("parameterMap"), datadict["directionPivot"]), ["mu0", "scaleFactor", "directionPivot", "mus"]) else: return super().initializeModelData(datadict) @property def npar(self): """Number of parameters.""" if hasattr(self, "_temporaryPivot"): return self.nparPivot return super().npar def checkParameterListPivot(self, mu:paramList, check_if_single : bool = False) -> paramList: return checkParameterList(mu, self.nparPivot, check_if_single) def checkParameterListMarginal(self, mu:paramList, check_if_single : bool = False) -> paramList: return checkParameterList(mu, self.nparMarginal, check_if_single) def mapParameterList(self, *args, **kwargs): if hasattr(self, "_temporaryPivot"): return self.mapParameterListPivot(*args, **kwargs) return super().mapParameterList(*args, **kwargs) def mapParameterListPivot(self, mu:paramList, direct : str = "F", idx : List[int] = None): if idx is None: idx = self.directionPivot else: idx = [self.directionPivot[j] for j in idx] return super().mapParameterList(mu, direct, idx) def mapParameterListMarginal(self, mu:paramList, direct : str = "F", idx : List[int] = None): if idx is None: idx = self.directionMarginal else: idx = [self.directionMarginal[j] for j in idx] return super().mapParameterList(mu, direct, idx) @property def mu0(self): """Value of mu0.""" if hasattr(self, "_temporaryPivot"): return self.checkParameterListPivot(self._mu0(self.directionPivot)) return self._mu0 @mu0.setter def mu0(self, mu0): GenericApproximant.mu0.fset(self, mu0) @property def mus(self): """Value of mus. Its assignment may reset snapshots.""" return self._mus @mus.setter def mus(self, mus): mus = self.checkParameterList(mus) musOld = copy(self.mus) if hasattr(self, '_mus') else None if (musOld is None or len(mus) != len(musOld) or not mus == musOld): self.resetSamples() self._mus = mus @property def musMarginal(self): """Value of musMarginal. Its assignment may reset snapshots.""" return self._musMarginal @musMarginal.setter def musMarginal(self, musMarginal): musMarginal = self.checkParameterListMarginal(musMarginal) if hasattr(self, '_musMarginal'): musMOld = copy(self.musMarginal) else: musMOld = None if (musMOld is None or len(musMarginal) != len(musMOld) or not musMarginal == musMOld): self.resetSamples() self._musMarginal = musMarginal @property def SMarginal(self): """Value of SMarginal.""" return self._SMarginal @SMarginal.setter def SMarginal(self, SMarginal): if SMarginal <= 0: raise RROMPyException("SMarginal must be positive.") if hasattr(self, "_SMarginal") and self._SMarginal is not None: Sold = self.SMarginal else: Sold = -1 self._SMarginal = SMarginal self._approxParameters["SMarginal"] = self.SMarginal if Sold != self.SMarginal: self.resetSamples() @property def radialDirectionalWeightsMarginal(self): """Value of radialDirectionalWeightsMarginal.""" return self._radialDirectionalWeightsMarginal @radialDirectionalWeightsMarginal.setter def radialDirectionalWeightsMarginal(self, radialDirWeightsMarg): - if hasattr(radialDirWeightsMarg, "__len__"): + if isinstance(radialDirWeightsMarg, Iterable): radialDirWeightsMarg = list(radialDirWeightsMarg) else: radialDirWeightsMarg = [radialDirWeightsMarg] self._radialDirectionalWeightsMarginal = radialDirWeightsMarg self._approxParameters["radialDirectionalWeightsMarginal"] = ( self.radialDirectionalWeightsMarginal) @property def directionPivot(self): """Value of directionPivot. Its assignment may reset snapshots.""" return self._directionPivot @directionPivot.setter def directionPivot(self, directionPivot): if hasattr(self, '_directionPivot'): directionPivotOld = copy(self.directionPivot) else: directionPivotOld = None if (directionPivotOld is None or len(directionPivot) != len(directionPivotOld) or not directionPivot == directionPivotOld): self.resetSamples() self._directionPivot = directionPivot @property def directionMarginal(self): return [x for x in range(self.HFEngine.npar) \ if x not in self.directionPivot] @property def nparPivot(self): return len(self.directionPivot) @property def nparMarginal(self): return self.npar - self.nparPivot @property def muBounds(self): """Value of muBounds.""" return self.samplerPivot.lims @property def muBoundsMarginal(self): """Value of muBoundsMarginal.""" return self.samplerMarginal.lims @property def sampler(self): """Proxy of samplerPivot.""" return self._samplerPivot @property def samplerPivot(self): """Value of samplerPivot.""" return self._samplerPivot @samplerPivot.setter def samplerPivot(self, samplerPivot): if 'generatePoints' not in dir(samplerPivot): raise RROMPyException("Pivot sampler type not recognized.") if hasattr(self, '_samplerPivot') and self._samplerPivot is not None: samplerOld = self.samplerPivot self._samplerPivot = samplerPivot self._approxParameters["samplerPivot"] = self.samplerPivot if not 'samplerOld' in locals() or samplerOld != self.samplerPivot: self.resetSamples() @property def samplerMarginal(self): """Value of samplerMarginal.""" return self._samplerMarginal @samplerMarginal.setter def samplerMarginal(self, samplerMarginal): if 'generatePoints' not in dir(samplerMarginal): raise RROMPyException("Marginal sampler type not recognized.") if (hasattr(self, '_samplerMarginal') and self._samplerMarginal is not None): samplerOld = self.samplerMarginal self._samplerMarginal = samplerMarginal self._approxParameters["samplerMarginal"] = self.samplerMarginal if not 'samplerOld' in locals() or samplerOld != self.samplerMarginal: self.resetSamples() def computeScaleFactor(self): """Compute parameter rescaling factor.""" self.scaleFactorPivot = .5 * np.abs(( self.mapParameterListPivot(self.muBounds[0]) - self.mapParameterListPivot(self.muBounds[1]))[0]) self.scaleFactorMarginal = .5 * np.abs(( self.mapParameterListMarginal(self.muBoundsMarginal[0]) - self.mapParameterListMarginal(self.muBoundsMarginal[1]))[0]) self.scaleFactor = np.empty(self.npar) self.scaleFactor[self.directionPivot] = self.scaleFactorPivot self.scaleFactor[self.directionMarginal] = self.scaleFactorMarginal def _setupTrainedModel(self, pMat:Np2D, pMatUpdate : bool = False, forceNew : bool = False): pMatEff = self.HFEngine.applyC(pMat) if self.approx_state else pMat if forceNew or self.trainedModel is None: self.trainedModel = self.tModelType() self.trainedModel.verbosity = self.verbosity self.trainedModel.timestamp = self.timestamp datadict = {"mu0": self.mu0, "mus": copy(self.mus), "projMat": pMatEff, "scaleFactor": self.scaleFactor, "parameterMap": self.HFEngine.parameterMap, "directionPivot": self.directionPivot} self.trainedModel.data = self.initializeModelData(datadict)[0] else: self.trainedModel = self.trainedModel if pMatUpdate: self.trainedModel.data.projMat = np.hstack( (self.trainedModel.data.projMat, pMatEff)) else: self.trainedModel.data.projMat = copy(pMatEff) self.trainedModel.data.mus = copy(self.mus) self.trainedModel.data.musMarginal = copy(self.musMarginal) def normApprox(self, mu:paramList) -> float: _PODOld = self.POD self._POD = False result = super().normApprox(mu) self._POD = _PODOld return result @property def storedSamplesFilenames(self) -> List[str]: if not hasattr(self, "_sampleBaseFilename"): return [] return [self._sampleBaseFilename + "{}_{}.pkl" .format(idx + 1, self.name()) for idx in range(len(self.musMarginal))] def purgeStoredSamples(self): if not hasattr(self, "_sampleBaseFilename"): return for file in self.storedSamplesFilenames: remove(file) rmdir(self._sampleBaseFilename[: -8]) def storeSamples(self, idx : int = None): """Store samples to file.""" if not hasattr(self, "_sampleBaseFilename"): filenameBase = None if poolRank() == 0: foldername = getNewFilename(self.name(), "samples") mkdir(foldername) filenameBase = foldername + "/sample_" self._sampleBaseFilename = bcast(filenameBase, force = True) if idx is not None: super().storeSamples(self._sampleBaseFilename + str(idx + 1), False) def loadTrainedModel(self, filename:str): """Load trained reduced model from file.""" super().loadTrainedModel(filename) self._musMarginal = self.trainedModel.data.musMarginal class GenericPivotedApproximantNoMatch(GenericPivotedApproximantBase): """ ROM pivoted approximant (without pole matching) computation for parametric problems (ABSTRACT). Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. directionPivot(optional): Pivot components. Defaults to [0]. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': whether to compute POD of snapshots; defaults to True; - 'scaleFactorDer': scaling factors for derivative computation; defaults to 'AUTO'; - 'S': total number of pivot samples current approximant relies upon; - 'samplerPivot': pivot sample point generator; - 'SMarginal': total number of marginal samples current approximant relies upon; - 'samplerMarginal': marginal sample point generator; - 'radialDirectionalWeightsMarginal': radial basis weights for marginal interpolant; defaults to 1. Defaults to empty dict. approx_state(optional): Whether to approximate state. Defaults to False. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. mu0: Default parameter. directionPivot: Pivot components. mus: Array of snapshot parameters. musMarginal: Array of marginal snapshot parameters. approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList. parameterListSoft: Recognized keys of soft approximant parameters: - 'POD': whether to compute POD of snapshots; - 'scaleFactorDer': scaling factors for derivative computation; - 'radialDirectionalWeightsMarginal': radial basis weights for marginal interpolant. parameterListCritical: Recognized keys of critical approximant parameters: - 'S': total number of pivot samples current approximant relies upon; - 'samplerPivot': pivot sample point generator; - 'SMarginal': total number of marginal samples current approximant relies upon; - 'samplerMarginal': marginal sample point generator. approx_state: Whether to approximate state. verbosity: Verbosity level. POD: Whether to compute POD of snapshots. scaleFactorDer: Scaling factors for derivative computation. S: Total number of pivot samples current approximant relies upon. samplerPivot: Pivot sample point generator. SMarginal: Total number of marginal samples current approximant relies upon. samplerMarginal: Marginal sample point generator. radialDirectionalWeightsMarginal: Radial basis weights for marginal interpolant. muBounds: list of bounds for pivot parameter values. muBoundsMarginal: list of bounds for marginal parameter values. samplingEngine: Sampling engine. uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as sampleList. lastSolvedHF: Parameter(s) corresponding to last computed high fidelity solution(s) as parameterList. uApproxReduced: Reduced approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApproxReduced: Parameter(s) corresponding to last computed reduced approximate solution(s) as parameterList. uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApprox: Parameter(s) corresponding to last computed approximate solution(s) as parameterList. """ @property def tModelType(self): from .trained_model.trained_model_pivoted_rational_nomatch import ( TrainedModelPivotedRationalNoMatch) return TrainedModelPivotedRationalNoMatch def _finalizeMarginalization(self): self.trainedModel.setupMarginalInterp( [self.radialDirectionalWeightsMarginal]) self.trainedModel.data.approxParameters = copy(self.approxParameters) def _poleMatching(self): vbMng(self, "INIT", "Compressing poles.", 10) self.trainedModel.initializeFromRational() vbMng(self, "DEL", "Done compressing poles.", 10) class GenericPivotedApproximant(GenericPivotedApproximantBase): """ ROM pivoted approximant (with pole matching) computation for parametric problems (ABSTRACT). Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. directionPivot(optional): Pivot components. Defaults to [0]. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': whether to compute POD of snapshots; defaults to True; - 'scaleFactorDer': scaling factors for derivative computation; defaults to 'AUTO'; - 'matchingWeight': weight for pole matching optimization; defaults to 1; - 'matchingMode': mode for pole matching optimization; allowed values include 'NONE' and 'SHIFT'; defaults to 'NONE'; - 'sharedRatio': required ratio of marginal points to share resonance; defaults to 1.; - 'S': total number of pivot samples current approximant relies upon; - 'samplerPivot': pivot sample point generator; - 'SMarginal': total number of marginal samples current approximant relies upon; - 'samplerMarginal': marginal sample point generator; - 'polybasisMarginal': type of polynomial basis for marginal interpolation; allowed values include 'MONOMIAL_*', 'CHEBYSHEV_*', 'LEGENDRE_*', 'NEARESTNEIGHBOR', and 'PIECEWISE_LINEAR_*'; defaults to 'MONOMIAL'; - 'paramsMarginal': dictionary of parameters for marginal interpolation; include: . 'MMarginal': degree of marginal interpolant; defaults to 'AUTO', i.e. maximum allowed; not for 'NEARESTNEIGHBOR' or 'PIECEWISE_LINEAR_*'; . 'nNeighborsMarginal': number of marginal nearest neighbors; defaults to 1; only for 'NEARESTNEIGHBOR'; . 'polydegreetypeMarginal': type of polynomial degree for marginal; defaults to 'TOTAL'; not for 'NEARESTNEIGHBOR' or 'PIECEWISE_LINEAR_*'; . 'interpRcondMarginal': tolerance for marginal interpolation; defaults to None; not for 'NEARESTNEIGHBOR' or 'PIECEWISE_LINEAR_*'; . 'radialDirectionalWeightsMarginalAdapt': bounds for adaptive rescaling of marginal radial basis weights; only for radial basis. - 'radialDirectionalWeightsMarginal': radial basis weights for marginal interpolant; defaults to 1. Defaults to empty dict. approx_state(optional): Whether to approximate state. Defaults to False. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. mu0: Default parameter. directionPivot: Pivot components. mus: Array of snapshot parameters. musMarginal: Array of marginal snapshot parameters. approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList. parameterListSoft: Recognized keys of soft approximant parameters: - 'POD': whether to compute POD of snapshots; - 'scaleFactorDer': scaling factors for derivative computation; - 'matchingWeight': weight for pole matching optimization; - 'matchingMode': mode for pole matching optimization; - 'sharedRatio': required ratio of marginal points to share resonance; - 'polybasisMarginal': type of polynomial basis for marginal interpolation; - 'paramsMarginal': dictionary of parameters for marginal interpolation; include: . 'MMarginal': degree of marginal interpolant; . 'nNeighborsMarginal': number of marginal nearest neighbors; . 'polydegreetypeMarginal': type of polynomial degree for marginal; . 'interpRcondMarginal': tolerance for marginal interpolation; . 'radialDirectionalWeightsMarginalAdapt': bounds for adaptive rescaling of marginal radial basis weights. - 'radialDirectionalWeightsMarginal': radial basis weights for marginal interpolant. parameterListCritical: Recognized keys of critical approximant parameters: - 'S': total number of pivot samples current approximant relies upon; - 'samplerPivot': pivot sample point generator; - 'SMarginal': total number of marginal samples current approximant relies upon; - 'samplerMarginal': marginal sample point generator. approx_state: Whether to approximate state. verbosity: Verbosity level. POD: Whether to compute POD of snapshots. scaleFactorDer: Scaling factors for derivative computation. matchingWeight: Weight for pole matching optimization. matchingMode: Mode for pole matching optimization. sharedRatio: Required ratio of marginal points to share resonance. S: Total number of pivot samples current approximant relies upon. samplerPivot: Pivot sample point generator. SMarginal: Total number of marginal samples current approximant relies upon. samplerMarginal: Marginal sample point generator. polybasisMarginal: Type of polynomial basis for marginal interpolation. paramsMarginal: Dictionary of parameters for marginal interpolation. radialDirectionalWeightsMarginal: Radial basis weights for marginal interpolant. muBounds: list of bounds for pivot parameter values. muBoundsMarginal: list of bounds for marginal parameter values. samplingEngine: Sampling engine. uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as sampleList. lastSolvedHF: Parameter(s) corresponding to last computed high fidelity solution(s) as parameterList. uApproxReduced: Reduced approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApproxReduced: Parameter(s) corresponding to last computed reduced approximate solution(s) as parameterList. uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApprox: Parameter(s) corresponding to last computed approximate solution(s) as parameterList. """ def __init__(self, *args, **kwargs): self._preInit() self._addParametersToList(["matchingWeight", "matchingMode", "sharedRatio", "polybasisMarginal", "paramsMarginal"], [1., "NONE", 1., "MONOMIAL", {}]) self.parameterMarginalList = ["MMarginal", "nNeighborsMarginal", "polydegreetypeMarginal", "interpRcondMarginal", "radialDirectionalWeightsMarginalAdapt"] super().__init__(*args, **kwargs) self._postInit() @property def tModelType(self): from .trained_model.trained_model_pivoted_rational import ( TrainedModelPivotedRational) return TrainedModelPivotedRational @property def matchingWeight(self): """Value of matchingWeight.""" return self._matchingWeight @matchingWeight.setter def matchingWeight(self, matchingWeight): self._matchingWeight = matchingWeight self._approxParameters["matchingWeight"] = self.matchingWeight @property def matchingMode(self): """Value of matchingMode.""" return self._matchingMode @matchingMode.setter def matchingMode(self, matchingMode): matchingMode = matchingMode.upper().strip().replace(" ", "") if matchingMode != "NONE" and matchingMode[: 5] != "SHIFT": raise RROMPyException("Prescribed matching mode not recognized.") self._matchingMode = matchingMode self._approxParameters["matchingMode"] = self.matchingMode @property def sharedRatio(self): """Value of sharedRatio.""" return self._sharedRatio @sharedRatio.setter def sharedRatio(self, sharedRatio): if sharedRatio > 1.: RROMPyWarning("Shared ratio too large. Clipping to 1.") sharedRatio = 1. elif sharedRatio < 0.: RROMPyWarning("Shared ratio too small. Clipping to 0.") sharedRatio = 0. self._sharedRatio = sharedRatio self._approxParameters["sharedRatio"] = self.sharedRatio @property def polybasisMarginal(self): """Value of polybasisMarginal.""" return self._polybasisMarginal @polybasisMarginal.setter def polybasisMarginal(self, polybasisMarginal): try: polybasisMarginal = polybasisMarginal.upper().strip().replace(" ", "") if polybasisMarginal not in ppb + rbpb + ["NEARESTNEIGHBOR"] + sk: raise RROMPyException( "Prescribed marginal polybasis not recognized.") self._polybasisMarginal = polybasisMarginal except: RROMPyWarning(("Prescribed marginal polybasis not recognized. " "Overriding to 'MONOMIAL'.")) self._polybasisMarginal = "MONOMIAL" self._approxParameters["polybasisMarginal"] = self.polybasisMarginal @property def paramsMarginal(self): """Value of paramsMarginal.""" return self._paramsMarginal @paramsMarginal.setter def paramsMarginal(self, paramsMarginal): paramsMarginal = purgeDict(paramsMarginal, self.parameterMarginalList, dictname = self.name() + ".paramsMarginal", baselevel = 1) keyList = list(paramsMarginal.keys()) if not hasattr(self, "_paramsMarginal"): self._paramsMarginal = {} if "MMarginal" in keyList: MMarg = paramsMarginal["MMarginal"] elif ("MMarginal" in self.paramsMarginal and not hasattr(self, "_MMarginal_isauto")): MMarg = self.paramsMarginal["MMarginal"] else: MMarg = "AUTO" if isinstance(MMarg, str): MMarg = MMarg.strip().replace(" ","") if "-" not in MMarg: MMarg = MMarg + "-0" self._MMarginal_isauto = True self._MMarginal_shift = int(MMarg.split("-")[-1]) MMarg = 0 if MMarg < 0: raise RROMPyException("MMarginal must be non-negative.") self._paramsMarginal["MMarginal"] = MMarg if "nNeighborsMarginal" in keyList: self._paramsMarginal["nNeighborsMarginal"] = max(1, paramsMarginal["nNeighborsMarginal"]) elif "nNeighborsMarginal" not in self.paramsMarginal: self._paramsMarginal["nNeighborsMarginal"] = 1 if "polydegreetypeMarginal" in keyList: try: polydegtypeM = paramsMarginal["polydegreetypeMarginal"]\ .upper().strip().replace(" ","") if polydegtypeM not in ["TOTAL", "FULL"]: raise RROMPyException(("Prescribed polydegreetypeMarginal " "not recognized.")) self._paramsMarginal["polydegreetypeMarginal"] = polydegtypeM except: RROMPyWarning(("Prescribed polydegreetypeMarginal not " "recognized. Overriding to 'TOTAL'.")) self._paramsMarginal["polydegreetypeMarginal"] = "TOTAL" elif "polydegreetypeMarginal" not in self.paramsMarginal: self._paramsMarginal["polydegreetypeMarginal"] = "TOTAL" if "interpRcondMarginal" in keyList: self._paramsMarginal["interpRcondMarginal"] = ( paramsMarginal["interpRcondMarginal"]) elif "interpRcondMarginal" not in self.paramsMarginal: self._paramsMarginal["interpRcondMarginal"] = -1 if "radialDirectionalWeightsMarginalAdapt" in keyList: self._paramsMarginal["radialDirectionalWeightsMarginalAdapt"] = ( paramsMarginal["radialDirectionalWeightsMarginalAdapt"]) elif "radialDirectionalWeightsMarginalAdapt" not in self.paramsMarginal: self._paramsMarginal["radialDirectionalWeightsMarginalAdapt"] = [ -1., -1.] self._approxParameters["paramsMarginal"] = self.paramsMarginal def _setMMarginalAuto(self): if (self.polybasisMarginal not in ppb + rbpb or "MMarginal" not in self.paramsMarginal or "polydegreetypeMarginal" not in self.paramsMarginal): raise RROMPyException(("Cannot set MMarginal if " "polybasisMarginal does not allow it.")) self.paramsMarginal["MMarginal"] = max(0, reduceDegreeN( len(self.musMarginal), len(self.musMarginal), self.nparMarginal, self.paramsMarginal["polydegreetypeMarginal"]) - self._MMarginal_shift) vbMng(self, "MAIN", ("Automatically setting MMarginal to {}.").format( self.paramsMarginal["MMarginal"]), 25) def purgeparamsMarginal(self): self.paramsMarginal = {} paramsMbadkeys = [] if self.polybasisMarginal in ppb + rbpb + sk: paramsMbadkeys += ["nNeighborsMarginal"] if self.polybasisMarginal not in rbpb: paramsMbadkeys += ["radialDirectionalWeightsMarginalAdapt"] if self.polybasisMarginal in ["NEARESTNEIGHBOR"] + sk: paramsMbadkeys += ["MMarginal", "polydegreetypeMarginal", "interpRcondMarginal"] if hasattr(self, "_MMarginal_isauto"): del self._MMarginal_isauto if hasattr(self, "_MMarginal_shift"): del self._MMarginal_shift for key in paramsMbadkeys: if key in self._paramsMarginal: del self._paramsMarginal[key] self._approxParameters["paramsMarginal"] = self.paramsMarginal def _finalizeMarginalization(self): vbMng(self, "INIT", "Checking shared ratio.", 10) msg = self.trainedModel.checkSharedRatio(self.sharedRatio) vbMng(self, "DEL", "Done checking." + msg, 10) if self.polybasisMarginal in rbpb + ["NEARESTNEIGHBOR"]: self.computeScaleFactor() rDWMEff = np.array([w * f for w, f in zip( self.radialDirectionalWeightsMarginal, self.scaleFactorMarginal)]) if self.polybasisMarginal in ppb + rbpb + sk: interpPars = [self.polybasisMarginal] if self.polybasisMarginal in ppb + rbpb: if self.polybasisMarginal in rbpb: interpPars += [rDWMEff] interpPars += [self.verbosity >= 5, self.paramsMarginal["polydegreetypeMarginal"] == "TOTAL"] if self.polybasisMarginal in ppb: interpPars += [{}] else: # if self.polybasisMarginal in rbpb: interpPars += [{"optimizeScalingBounds":self.paramsMarginal[ "radialDirectionalWeightsMarginalAdapt"]}] interpPars += [ {"rcond":self.paramsMarginal["interpRcondMarginal"]}] extraPar = hasattr(self, "_MMarginal_isauto") else: # if self.polybasisMarginal in sk: idxEff = [x for x in range(self.samplerMarginal.npoints) if not hasattr(self.trainedModel, "_idxExcl") or x not in self.trainedModel._idxExcl] extraPar = self.samplerMarginal.depth[idxEff] else: # if self.polybasisMarginal == "NEARESTNEIGHBOR": interpPars = [self.paramsMarginal["nNeighborsMarginal"], rDWMEff] extraPar = None self.trainedModel.setupMarginalInterp(self, interpPars, extraPar) self.trainedModel.data.approxParameters = copy(self.approxParameters) def _poleMatching(self): vbMng(self, "INIT", "Compressing and matching poles.", 10) self.trainedModel.initializeFromRational(self.matchingWeight, self.matchingMode, self.HFEngine, False) vbMng(self, "DEL", "Done compressing and matching poles.", 10) def setupApprox(self, *args, **kwargs) -> int: if self.checkComputedApprox(): return -1 self.purgeparamsMarginal() return super().setupApprox(*args, **kwargs) diff --git a/rrompy/reduction_methods/pivoted/greedy/generic_pivoted_greedy_approximant.py b/rrompy/reduction_methods/pivoted/greedy/generic_pivoted_greedy_approximant.py index e398688..07ce7a8 100644 --- a/rrompy/reduction_methods/pivoted/greedy/generic_pivoted_greedy_approximant.py +++ b/rrompy/reduction_methods/pivoted/greedy/generic_pivoted_greedy_approximant.py @@ -1,735 +1,736 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from abc import abstractmethod from copy import deepcopy as copy import numpy as np +from collections.abc import Iterable from matplotlib import pyplot as plt from rrompy.reduction_methods.pivoted.generic_pivoted_approximant import ( GenericPivotedApproximantBase, GenericPivotedApproximantNoMatch, GenericPivotedApproximant) from rrompy.reduction_methods.pivoted.gather_pivoted_approximant import ( gatherPivotedApproximant) from rrompy.utilities.base.types import (Np1D, Np2D, Tuple, List, paramVal, paramList, ListAny) from rrompy.utilities.base import verbosityManager as vbMng from rrompy.utilities.numerical.point_matching import (pointMatching, chordalMetricAdjusted) from rrompy.utilities.exception_manager import (RROMPyException, RROMPyAssert, RROMPyWarning) from rrompy.parameter import emptyParameterList from rrompy.utilities.parallel import (masterCore, indicesScatter, arrayGatherv, isend) __all__ = ['GenericPivotedGreedyApproximantNoMatch', 'GenericPivotedGreedyApproximant'] class GenericPivotedGreedyApproximantBase(GenericPivotedApproximantBase): _allowedEstimatorKindsMarginal = ["LOOK_AHEAD", "LOOK_AHEAD_RECOVER", "NONE"] def __init__(self, *args, **kwargs): self._preInit() self._addParametersToList(["matchingWeightError", "errorEstimatorKindMarginal", "greedyTolMarginal", "maxIterMarginal"], [0., "NONE", 1e-1, 1e2]) super().__init__(*args, **kwargs) self._postInit() @property def scaleFactorDer(self): """Value of scaleFactorDer.""" if self._scaleFactorDer == "NONE": return 1. if self._scaleFactorDer == "AUTO": return self._scaleFactorOldPivot return self._scaleFactorDer @scaleFactorDer.setter def scaleFactorDer(self, scaleFactorDer): if isinstance(scaleFactorDer, (str,)): scaleFactorDer = scaleFactorDer.upper() - elif hasattr(scaleFactorDer, "__len__"): + elif isinstance(scaleFactorDer, Iterable): scaleFactorDer = list(scaleFactorDer) self._scaleFactorDer = scaleFactorDer self._approxParameters["scaleFactorDer"] = self._scaleFactorDer @property def samplerMarginal(self): """Value of samplerMarginal.""" return self._samplerMarginal @samplerMarginal.setter def samplerMarginal(self, samplerMarginal): if 'refine' not in dir(samplerMarginal): raise RROMPyException("Marginal sampler type not recognized.") GenericPivotedApproximantBase.samplerMarginal.fset(self, samplerMarginal) @property def errorEstimatorKindMarginal(self): """Value of errorEstimatorKindMarginal.""" return self._errorEstimatorKindMarginal @errorEstimatorKindMarginal.setter def errorEstimatorKindMarginal(self, errorEstimatorKindMarginal): errorEstimatorKindMarginal = errorEstimatorKindMarginal.upper() if errorEstimatorKindMarginal not in ( self._allowedEstimatorKindsMarginal): RROMPyWarning(("Marginal error estimator kind not recognized. " "Overriding to 'NONE'.")) errorEstimatorKindMarginal = "NONE" self._errorEstimatorKindMarginal = errorEstimatorKindMarginal self._approxParameters["errorEstimatorKindMarginal"] = ( self.errorEstimatorKindMarginal) @property def matchingWeightError(self): """Value of matchingWeightError.""" return self._matchingWeightError @matchingWeightError.setter def matchingWeightError(self, matchingWeightError): self._matchingWeightError = matchingWeightError self._approxParameters["matchingWeightError"] = ( self.matchingWeightError) @property def greedyTolMarginal(self): """Value of greedyTolMarginal.""" return self._greedyTolMarginal @greedyTolMarginal.setter def greedyTolMarginal(self, greedyTolMarginal): if greedyTolMarginal < 0: raise RROMPyException("greedyTolMarginal must be non-negative.") if (hasattr(self, "_greedyTolMarginal") and self.greedyTolMarginal is not None): greedyTolMarginalold = self.greedyTolMarginal else: greedyTolMarginalold = -1 self._greedyTolMarginal = greedyTolMarginal self._approxParameters["greedyTolMarginal"] = self.greedyTolMarginal if greedyTolMarginalold != self.greedyTolMarginal: self.resetSamples() @property def maxIterMarginal(self): """Value of maxIterMarginal.""" return self._maxIterMarginal @maxIterMarginal.setter def maxIterMarginal(self, maxIterMarginal): if maxIterMarginal <= 0: raise RROMPyException("maxIterMarginal must be positive.") if (hasattr(self, "_maxIterMarginal") and self.maxIterMarginal is not None): maxIterMarginalold = self.maxIterMarginal else: maxIterMarginalold = -1 self._maxIterMarginal = maxIterMarginal self._approxParameters["maxIterMarginal"] = self.maxIterMarginal if maxIterMarginalold != self.maxIterMarginal: self.resetSamples() def resetSamples(self): """Reset samples.""" super().resetSamples() if not hasattr(self, "_temporaryPivot"): self._mus = emptyParameterList() self._musMarginal = emptyParameterList() if hasattr(self, "samplerMarginal"): self.samplerMarginal.reset() if hasattr(self, "samplingEngine") and self.samplingEngine is not None: self.samplingEngine.resetHistory() def _getDistanceApp(self, polesEx:Np1D, resEx:Np2D, muTest:paramVal, foci:Tuple[float, float], ground:float) -> float: polesAp = self.trainedModel.interpolateMarginalPoles(muTest)[0] if self.matchingWeightError != 0: resAp = self.trainedModel.interpolateMarginalCoeffs(muTest)[0][ : len(polesAp), :] resEx = self.trainedModel.data.projMat[:, : resEx.shape[1]].dot(resEx.T) resAp = self.trainedModel.data.projMat[:, : resAp.shape[1]].dot(resAp.T) else: resAp = None dist = chordalMetricAdjusted(polesEx, polesAp, self.matchingWeightError, resEx, resAp, self.HFEngine, False) pmR, pmC = pointMatching(dist) return np.mean(dist[pmR, pmC]) def getErrorEstimatorMarginalLookAhead(self) -> Np1D: if not hasattr(self.trainedModel, "_musMExcl"): err = np.zeros(0) err[:] = np.inf self._musMarginalTestIdxs = np.zeros(0, dtype = int) return err self._musMarginalTestIdxs = np.array(self.trainedModel._idxExcl, dtype = int) idx, sizes = indicesScatter(len(self.trainedModel._musMExcl), return_sizes = True) err = [] if len(idx) > 0: self.verbosity -= 35 self.trainedModel.verbosity -= 35 foci = self.samplerPivot.normalFoci() ground = self.samplerPivot.groundPotential() for j in idx: muTest = self.trainedModel._musMExcl[j] HITest = self.trainedModel._HIsExcl[j] polesEx = HITest.poles idxGood = np.logical_not(np.logical_or(np.isinf(polesEx), np.isnan(polesEx))) polesEx = polesEx[idxGood] if self.matchingWeightError != 0: resEx = HITest.coeffs[np.where(idxGood)[0]] else: resEx = None if len(polesEx) == 0: err += [0.] continue err += [self._getDistanceApp(polesEx, resEx, muTest, foci, ground)] self.verbosity += 35 self.trainedModel.verbosity += 35 return arrayGatherv(np.array(err), sizes) def getErrorEstimatorMarginalNone(self) -> Np1D: nErr = len(self.trainedModel.data.musMarginal) self._musMarginalTestIdxs = np.arange(nErr) return (1. + self.greedyTolMarginal) * np.ones(nErr) def errorEstimatorMarginal(self, return_max : bool = False) -> Np1D: vbMng(self.trainedModel, "INIT", "Evaluating error estimator at mu = {}.".format( self.trainedModel.data.musMarginal), 10) if self.errorEstimatorKindMarginal == "NONE": nErr = len(self.trainedModel.data.musMarginal) self._musMarginalTestIdxs = np.arange(nErr) err = (1. + self.greedyTolMarginal) * np.ones(nErr) else:#if self.errorEstimatorKindMarginal[: 10] == "LOOK_AHEAD": err = self.getErrorEstimatorMarginalLookAhead() vbMng(self.trainedModel, "DEL", "Done evaluating error estimator", 10) if not return_max: return err idxMaxEst = np.where(err > self.greedyTolMarginal)[0] maxErr = err[idxMaxEst] if self.errorEstimatorKindMarginal == "NONE": maxErr = None return err, idxMaxEst, maxErr def plotEstimatorMarginal(self, est:Np1D, idxMax:List[int], estMax:List[float]): if self.errorEstimatorKindMarginal == "NONE": return if (not (np.any(np.isnan(est)) or np.any(np.isinf(est))) and masterCore() and hasattr(self.trainedModel, "_musMExcl")): fig = plt.figure(figsize = plt.figaspect(1. / self.nparMarginal)) for jpar in range(self.nparMarginal): ax = fig.add_subplot(1, self.nparMarginal, 1 + jpar) musre = np.real(self.trainedModel._musMExcl) if len(idxMax) > 0 and estMax is not None: maxrej = musre[idxMax, jpar] errCP = copy(est) idx = np.delete(np.arange(self.nparMarginal), jpar) while len(musre) > 0: if self.nparMarginal == 1: currIdx = np.arange(len(musre)) else: currIdx = np.where(np.isclose(np.sum( np.abs(musre[:, idx] - musre[0, idx]), 1), 0.))[0] currIdxSorted = currIdx[np.argsort(musre[currIdx, jpar])] ax.semilogy(musre[currIdxSorted, jpar], errCP[currIdxSorted], 'k.-', linewidth = 1) musre = np.delete(musre, currIdx, 0) errCP = np.delete(errCP, currIdx) ax.semilogy(self.musMarginal.re(jpar), (self.greedyTolMarginal,) * len(self.musMarginal), '*m') if len(idxMax) > 0 and estMax is not None: ax.semilogy(maxrej, estMax, 'xr') ax.set_xlim(*list(self.samplerMarginal.lims.re(jpar))) ax.grid() plt.tight_layout() plt.show() def _addMarginalSample(self, mus:paramList): mus = self.checkParameterListMarginal(mus) if len(mus) == 0: return self._nmusOld, nmus = len(self.musMarginal), len(mus) if (hasattr(self, "trainedModel") and self.trainedModel is not None and hasattr(self.trainedModel, "_musMExcl")): self._nmusOld += len(self.trainedModel._musMExcl) vbMng(self, "MAIN", ("Adding marginal sample point{} no. {}{} at {} to training " "set.").format("s" * (nmus > 1), self._nmusOld + 1, "--{}".format(self._nmusOld + nmus) * (nmus > 1), mus), 3) self.musMarginal.append(mus) self.setupApproxPivoted(mus) self._poleMatching() del self._nmusOld if (self.errorEstimatorKindMarginal[: 10] == "LOOK_AHEAD" and not self.firstGreedyIterM): ubRange = len(self.trainedModel.data.musMarginal) if hasattr(self.trainedModel, "_idxExcl"): shRange = len(self.trainedModel._musMExcl) else: shRange = 0 testIdxs = list(range(ubRange + shRange - len(mus), ubRange + shRange)) for j in testIdxs[::-1]: self.musMarginal.pop(j - shRange) if hasattr(self.trainedModel, "_idxExcl"): testIdxs = self.trainedModel._idxExcl + testIdxs self._updateTrainedModelMarginalSamples(testIdxs) self._finalizeMarginalization() self._SMarginal = len(self.musMarginal) self._approxParameters["SMarginal"] = self.SMarginal self.trainedModel.data.approxParameters["SMarginal"] = self.SMarginal def greedyNextSampleMarginal(self, muidx:List[int], plotEst : str = "NONE") \ -> Tuple[Np1D, List[int], float, paramVal]: RROMPyAssert(self._mode, message = "Cannot add greedy sample.") muidx = self._musMarginalTestIdxs[muidx] if (self.errorEstimatorKindMarginal[: 10] == "LOOK_AHEAD" and not self.firstGreedyIterM): if not hasattr(self.trainedModel, "_idxExcl"): raise RROMPyException(("Sample index to be added not present " "in trained model.")) testIdxs = copy(self.trainedModel._idxExcl) skippedIdx = 0 for cj, j in enumerate(self.trainedModel._idxExcl): if j in muidx: testIdxs.pop(skippedIdx) self.musMarginal.insert(self.trainedModel._musMExcl[cj], j - skippedIdx) else: skippedIdx += 1 if len(self.trainedModel._idxExcl) < (len(muidx) + len(testIdxs)): raise RROMPyException(("Sample index to be added not present " "in trained model.")) self._updateTrainedModelMarginalSamples(testIdxs) self._SMarginal = len(self.musMarginal) self._approxParameters["SMarginal"] = self.SMarginal self.trainedModel.data.approxParameters["SMarginal"] = ( self.SMarginal) self.firstGreedyIterM = False idxAdded = self.samplerMarginal.refine(muidx)[0] self._addMarginalSample(self.samplerMarginal.points[idxAdded]) errorEstTest, muidx, maxErrorEst = self.errorEstimatorMarginal(True) if plotEst == "ALL": self.plotEstimatorMarginal(errorEstTest, muidx, maxErrorEst) return (errorEstTest, muidx, maxErrorEst, self.samplerMarginal.points[muidx]) def _preliminaryTrainingMarginal(self): """Initialize starting snapshots of solution map.""" RROMPyAssert(self._mode, message = "Cannot start greedy algorithm.") if np.sum(self.samplingEngine.nsamples) > 0: return self.resetSamples() self._addMarginalSample(self.samplerMarginal.generatePoints( self.SMarginal)) def _preSetupApproxPivoted(self, mus:paramList) \ -> Tuple[ListAny, ListAny, ListAny]: self.computeScaleFactor() if self.trainedModel is None: self._setupTrainedModel(np.zeros((0, 0))) self.trainedModel.data.Qs, self.trainedModel.data.Ps = [], [] self.trainedModel.data.Psupp = [] self._trainedModelOld = copy(self.trainedModel) self._scaleFactorOldPivot = copy(self.scaleFactor) self.scaleFactor = self.scaleFactorPivot self._temporaryPivot = 1 self._musLoc = copy(self.mus) idx, sizes = indicesScatter(len(mus), return_sizes = True) emptyCores = np.where(np.logical_not(sizes))[0] self.verbosity -= 15 return idx, sizes, emptyCores def _postSetupApproxPivoted(self, mus:Np2D, pMat:Np2D, Ps:ListAny, Qs:ListAny, sizes:ListAny): self.scaleFactor = self._scaleFactorOldPivot del self._scaleFactorOldPivot, self._temporaryPivot pMat, Ps, Qs, mus, nsamples = gatherPivotedApproximant(pMat, Ps, Qs, mus, sizes, self.polybasis) if len(self._musLoc) > 0: self._mus = self.checkParameterList(self._musLoc) self._mus.append(mus) else: self._mus = self.checkParameterList(mus) self.trainedModel = self._trainedModelOld del self._trainedModelOld padLeft = self.trainedModel.data.projMat.shape[1] suppNew = np.append(0, np.cumsum(nsamples)) self._setupTrainedModel(pMat, padLeft > 0) self.trainedModel.data.Qs += Qs self.trainedModel.data.Ps += Ps self.trainedModel.data.Psupp += list(padLeft + suppNew[: -1]) self.trainedModel.data.approxParameters = copy(self.approxParameters) self.verbosity += 15 def _localPivotedResult(self, pMat:Np2D, req:ListAny, emptyCores:ListAny, mus:Np2D) -> Tuple[Np2D, ListAny, Np2D]: if pMat is None: mus = copy(self.samplingEngine.mus.data) pMat = copy(self.samplingEngine.projectionMatrix) if masterCore(): for dest in emptyCores: req += [isend((len(pMat), pMat.dtype, mus.dtype), dest = dest, tag = dest)] else: mus = np.vstack((mus, self.samplingEngine.mus.data)) pMat = np.hstack((pMat, self.samplingEngine.projectionMatrix)) return pMat, req, mus @abstractmethod def setupApproxPivoted(self, mus:paramList) -> int: if self.checkComputedApproxPivoted(): return -1 RROMPyAssert(self._mode, message = "Cannot setup approximant.") vbMng(self, "INIT", "Setting up pivoted approximant.", 10) self._preSetupApproxPivoted() data = [] pass self._postSetupApproxPivoted(mus, data) vbMng(self, "DEL", "Done setting up pivoted approximant.", 10) return 0 def setupApprox(self, plotEst : str = "NONE") -> int: """Compute greedy snapshots of solution map.""" if self.checkComputedApprox(): return -1 RROMPyAssert(self._mode, message = "Cannot start greedy algorithm.") vbMng(self, "INIT", "Starting computation of snapshots.", 3) max2ErrorEst, self.firstGreedyIterM = np.inf, True self._preliminaryTrainingMarginal() if self.errorEstimatorKindMarginal == "NONE": muidx = [] else:#if self.errorEstimatorKindMarginal[: 10] == "LOOK_AHEAD": muidx = np.arange(len(self.trainedModel.data.musMarginal)) self._musMarginalTestIdxs = np.array(muidx) while self.firstGreedyIterM or (max2ErrorEst > self.greedyTolMarginal and self.samplerMarginal.npoints < self.maxIterMarginal): errorEstTest, muidx, maxErrorEst, mu = \ self.greedyNextSampleMarginal(muidx, plotEst) if maxErrorEst is None: max2ErrorEst = 1. + self.greedyTolMarginal else: if len(maxErrorEst) > 0: max2ErrorEst = np.max(maxErrorEst) else: max2ErrorEst = np.max(errorEstTest) vbMng(self, "MAIN", ("Uniform testing error estimate " "{:.4e}.").format(max2ErrorEst), 3) if plotEst == "LAST": self.plotEstimatorMarginal(errorEstTest, muidx, maxErrorEst) vbMng(self, "DEL", ("Done computing snapshots (final snapshot count: " "{}).").format(len(self.mus)), 3) if (self.errorEstimatorKindMarginal == "LOOK_AHEAD_RECOVER" and hasattr(self.trainedModel, "_idxExcl") and len(self.trainedModel._idxExcl) > 0): vbMng(self, "INIT", "Recovering {} test models.".format( len(self.trainedModel._idxExcl)), 7) for j, mu in zip(self.trainedModel._idxExcl, self.trainedModel._musMExcl): self.musMarginal.insert(mu, j) self._updateTrainedModelMarginalSamples() self._finalizeMarginalization() self._SMarginal = len(self.musMarginal) self._approxParameters["SMarginal"] = self.SMarginal self.trainedModel.data.approxParameters["SMarginal"] = ( self.SMarginal) vbMng(self, "DEL", "Done recovering test models.", 7) return 0 def checkComputedApproxPivoted(self) -> bool: return (super().checkComputedApprox() and len(self.musMarginal) == len(self.trainedModel.data.musMarginal)) class GenericPivotedGreedyApproximantNoMatch( GenericPivotedGreedyApproximantBase, GenericPivotedApproximantNoMatch): """ ROM pivoted greedy interpolant computation for parametric problems (without pole matching) (ABSTRACT). Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. directionPivot(optional): Pivot components. Defaults to [0]. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': whether to compute POD of snapshots; defaults to True; - 'scaleFactorDer': scaling factors for derivative computation; defaults to 'AUTO'; - 'matchingWeightError': weight for pole matching optimization in error estimation; defaults to 0; - 'S': total number of pivot samples current approximant relies upon; - 'samplerPivot': pivot sample point generator; - 'SMarginal': number of starting marginal samples; - 'samplerMarginal': marginal sample point generator via sparse grid; - 'errorEstimatorKindMarginal': kind of marginal error estimator; available values include 'LOOK_AHEAD', 'LOOK_AHEAD_RECOVER', and 'NONE'; defaults to 'NONE'; - 'greedyTolMarginal': uniform error tolerance for marginal greedy algorithm; defaults to 1e-1; - 'maxIterMarginal': maximum number of marginal greedy steps; defaults to 1e2; - 'radialDirectionalWeightsMarginal': radial basis weights for marginal interpolant; defaults to 1. Defaults to empty dict. approx_state(optional): Whether to approximate state. Defaults to False. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. mu0: Default parameter. directionPivot: Pivot components. mus: Array of snapshot parameters. musMarginal: Array of marginal snapshot parameters. approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList. parameterListSoft: Recognized keys of soft approximant parameters: - 'POD': whether to compute POD of snapshots; - 'scaleFactorDer': scaling factors for derivative computation; - 'matchingWeightError': weight for pole matching optimization in error estimation; - 'errorEstimatorKindMarginal': kind of marginal error estimator; - 'greedyTolMarginal': uniform error tolerance for marginal greedy algorithm; - 'maxIterMarginal': maximum number of marginal greedy steps; - 'radialDirectionalWeightsMarginal': radial basis weights for marginal interpolant. parameterListCritical: Recognized keys of critical approximant parameters: - 'S': total number of pivot samples current approximant relies upon; - 'samplerPivot': pivot sample point generator; - 'SMarginal': total number of marginal samples current approximant relies upon; - 'samplerMarginal': marginal sample point generator via sparse grid. approx_state: Whether to approximate state. verbosity: Verbosity level. POD: Whether to compute POD of snapshots. scaleFactorDer: Scaling factors for derivative computation. matchingWeightError: Weight for pole matching optimization in error estimation. S: Total number of pivot samples current approximant relies upon. samplerPivot: Pivot sample point generator. SMarginal: Total number of marginal samples current approximant relies upon. samplerMarginal: Marginal sample point generator via sparse grid. errorEstimatorKindMarginal: Kind of marginal error estimator. greedyTolMarginal: Uniform error tolerance for marginal greedy algorithm. maxIterMarginal: Maximum number of marginal greedy steps. radialDirectionalWeightsMarginal: Radial basis weights for marginal interpolant. muBounds: list of bounds for pivot parameter values. muBoundsMarginal: list of bounds for marginal parameter values. samplingEngine: Sampling engine. uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as sampleList. lastSolvedHF: Parameter(s) corresponding to last computed high fidelity solution(s) as parameterList. uApproxReduced: Reduced approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApproxReduced: Parameter(s) corresponding to last computed reduced approximate solution(s) as parameterList. uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApprox: Parameter(s) corresponding to last computed approximate solution(s) as parameterList. """ def _poleMatching(self): vbMng(self, "INIT", "Compressing poles.", 10) self.trainedModel.initializeFromRational() vbMng(self, "DEL", "Done compressing poles.", 10) def _updateTrainedModelMarginalSamples(self, idx : ListAny = []): self.trainedModel.updateEffectiveSamples(idx) class GenericPivotedGreedyApproximant(GenericPivotedGreedyApproximantBase, GenericPivotedApproximant): """ ROM pivoted greedy interpolant computation for parametric problems (with pole matching) (ABSTRACT). Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. directionPivot(optional): Pivot components. Defaults to [0]. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': whether to compute POD of snapshots; defaults to True; - 'scaleFactorDer': scaling factors for derivative computation; defaults to 'AUTO'; - 'matchingWeight': weight for pole matching optimization; defaults to 1; - 'matchingMode': mode for pole matching optimization; allowed values include 'NONE' and 'SHIFT'; defaults to 'NONE'; - 'sharedRatio': required ratio of marginal points to share resonance; defaults to 1.; - 'matchingWeightError': weight for pole matching optimization in error estimation; defaults to 0; - 'S': total number of pivot samples current approximant relies upon; - 'samplerPivot': pivot sample point generator; - 'SMarginal': number of starting marginal samples; - 'samplerMarginal': marginal sample point generator via sparse grid; - 'errorEstimatorKindMarginal': kind of marginal error estimator; available values include 'LOOK_AHEAD', 'LOOK_AHEAD_RECOVER', and 'NONE'; defaults to 'NONE'; - 'polybasisMarginal': type of polynomial basis for marginal interpolation; allowed values include 'MONOMIAL_*', 'CHEBYSHEV_*', 'LEGENDRE_*', 'NEARESTNEIGHBOR', and 'PIECEWISE_LINEAR_*'; defaults to 'MONOMIAL'; - 'paramsMarginal': dictionary of parameters for marginal interpolation; include: . 'MMarginal': degree of marginal interpolant; defaults to 'AUTO', i.e. maximum allowed; not for 'NEARESTNEIGHBOR' or 'PIECEWISE_LINEAR_*'; . 'nNeighborsMarginal': number of marginal nearest neighbors; defaults to 1; only for 'NEARESTNEIGHBOR'; . 'polydegreetypeMarginal': type of polynomial degree for marginal; defaults to 'TOTAL'; not for 'NEARESTNEIGHBOR' or 'PIECEWISE_LINEAR_*'; . 'interpRcondMarginal': tolerance for marginal interpolation; defaults to None; not for 'NEARESTNEIGHBOR' or 'PIECEWISE_LINEAR_*'; . 'radialDirectionalWeightsMarginalAdapt': bounds for adaptive rescaling of marginal radial basis weights; only for radial basis. - 'greedyTolMarginal': uniform error tolerance for marginal greedy algorithm; defaults to 1e-1; - 'maxIterMarginal': maximum number of marginal greedy steps; defaults to 1e2; - 'radialDirectionalWeightsMarginal': radial basis weights for marginal interpolant; defaults to 1. Defaults to empty dict. approx_state(optional): Whether to approximate state. Defaults to False. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. mu0: Default parameter. directionPivot: Pivot components. mus: Array of snapshot parameters. musMarginal: Array of marginal snapshot parameters. approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList. parameterListSoft: Recognized keys of soft approximant parameters: - 'POD': whether to compute POD of snapshots; - 'scaleFactorDer': scaling factors for derivative computation; - 'matchingWeight': weight for pole matching optimization; - 'matchingMode': mode for pole matching optimization; - 'sharedRatio': required ratio of marginal points to share resonance; - 'matchingWeightError': weight for pole matching optimization in error estimation; - 'errorEstimatorKindMarginal': kind of marginal error estimator; - 'polybasisMarginal': type of polynomial basis for marginal interpolation; - 'paramsMarginal': dictionary of parameters for marginal interpolation; include: . 'MMarginal': degree of marginal interpolant; . 'nNeighborsMarginal': number of marginal nearest neighbors; . 'polydegreetypeMarginal': type of polynomial degree for marginal; . 'interpRcondMarginal': tolerance for marginal interpolation; . 'radialDirectionalWeightsMarginalAdapt': bounds for adaptive rescaling of marginal radial basis weights. - 'greedyTolMarginal': uniform error tolerance for marginal greedy algorithm; - 'maxIterMarginal': maximum number of marginal greedy steps; - 'radialDirectionalWeightsMarginal': radial basis weights for marginal interpolant. parameterListCritical: Recognized keys of critical approximant parameters: - 'S': total number of pivot samples current approximant relies upon; - 'samplerPivot': pivot sample point generator; - 'SMarginal': total number of marginal samples current approximant relies upon; - 'samplerMarginal': marginal sample point generator via sparse grid. approx_state: Whether to approximate state. verbosity: Verbosity level. POD: Whether to compute POD of snapshots. scaleFactorDer: Scaling factors for derivative computation. matchingWeight: Weight for pole matching optimization. matchingMode: Mode for pole matching optimization. sharedRatio: Required ratio of marginal points to share resonance. matchingWeightError: Weight for pole matching optimization in error estimation. S: Total number of pivot samples current approximant relies upon. samplerPivot: Pivot sample point generator. SMarginal: Total number of marginal samples current approximant relies upon. samplerMarginal: Marginal sample point generator via sparse grid. errorEstimatorKindMarginal: Kind of marginal error estimator. polybasisMarginal: Type of polynomial basis for marginal interpolation. paramsMarginal: Dictionary of parameters for marginal interpolation. greedyTolMarginal: Uniform error tolerance for marginal greedy algorithm. maxIterMarginal: Maximum number of marginal greedy steps. radialDirectionalWeightsMarginal: Radial basis weights for marginal interpolant. muBounds: list of bounds for pivot parameter values. muBoundsMarginal: list of bounds for marginal parameter values. samplingEngine: Sampling engine. uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as sampleList. lastSolvedHF: Parameter(s) corresponding to last computed high fidelity solution(s) as parameterList. uApproxReduced: Reduced approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApproxReduced: Parameter(s) corresponding to last computed reduced approximate solution(s) as parameterList. uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApprox: Parameter(s) corresponding to last computed approximate solution(s) as parameterList. """ def _poleMatching(self): vbMng(self, "INIT", "Compressing and matching poles.", 10) self.trainedModel.initializeFromRational(self.matchingWeight, self.matchingMode, self.HFEngine, False) vbMng(self, "DEL", "Done compressing and matching poles.", 10) def _updateTrainedModelMarginalSamples(self, idx : ListAny = []): self.trainedModel.updateEffectiveSamples(idx, self.matchingWeight, self.matchingMode, self.HFEngine, False) def setupApprox(self, *args, **kwargs) -> int: if self.checkComputedApprox(): return -1 self.purgeparamsMarginal() _polybasisMarginal = self.polybasisMarginal self._polybasisMarginal = ("PIECEWISE_LINEAR_" + self.samplerMarginal.kind) setupOK = super().setupApprox(*args, **kwargs) self._polybasisMarginal = _polybasisMarginal self._finalizeMarginalization() return setupOK diff --git a/rrompy/reduction_methods/pivoted/rational_interpolant_greedy_pivoted.py b/rrompy/reduction_methods/pivoted/rational_interpolant_greedy_pivoted.py index 945ce9f..8eab548 100644 --- a/rrompy/reduction_methods/pivoted/rational_interpolant_greedy_pivoted.py +++ b/rrompy/reduction_methods/pivoted/rational_interpolant_greedy_pivoted.py @@ -1,522 +1,527 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from copy import deepcopy as copy import numpy as np from .generic_pivoted_approximant import (GenericPivotedApproximantBase, GenericPivotedApproximantNoMatch, GenericPivotedApproximant) from .gather_pivoted_approximant import gatherPivotedApproximant from rrompy.reduction_methods.standard.greedy.rational_interpolant_greedy \ import RationalInterpolantGreedy from rrompy.reduction_methods.standard.greedy.generic_greedy_approximant \ import pruneSamples from rrompy.utilities.base.types import Np1D from rrompy.utilities.base import verbosityManager as vbMng from rrompy.utilities.poly_fitting.polynomial import polyvander as pv from rrompy.utilities.exception_manager import RROMPyAssert from rrompy.parameter import emptyParameterList, parameterList from rrompy.utilities.parallel import poolRank, indicesScatter, isend, recv __all__ = ['RationalInterpolantGreedyPivotedNoMatch', 'RationalInterpolantGreedyPivoted'] class RationalInterpolantGreedyPivotedBase(GenericPivotedApproximantBase, RationalInterpolantGreedy): def __init__(self, *args, **kwargs): self._preInit() super().__init__(*args, **kwargs) self._ignoreResidues = self.nparPivot > 1 self._postInit() @property def tModelType(self): if hasattr(self, "_temporaryPivot"): return RationalInterpolantGreedy.tModelType.fget(self) return super().tModelType def _polyvanderAuxiliary(self, mus, deg, *args): degEff = [0] * self.npar degEff[self.directionPivot[0]] = deg return pv(mus, degEff, *args) def _marginalizeMiscellanea(self, forward:bool): if forward: self._m_selfmus = copy(self.mus) self._m_HFEparameterMap = copy(self.HFEngine.parameterMap) self._mus = self.checkParameterListPivot( self.mus(self.directionPivot)) self.HFEngine.parameterMap = { "F": [self.HFEngine.parameterMap["F"][self.directionPivot[0]]], "B": [self.HFEngine.parameterMap["B"][self.directionPivot[0]]]} else: self._mus = self._m_selfmus self.HFEngine.parameterMap = self._m_HFEparameterMap del self._m_selfmus, self._m_HFEparameterMap def _marginalizeTrainedModel(self, forward:bool): if forward: del self._temporaryPivot self.trainedModel.data.mu0 = self.mu0 self.trainedModel.data.scaleFactor = [1.] * self.npar self.trainedModel.data.scaleFactor[self.directionPivot[0]] = ( self.scaleFactor[0]) self.trainedModel.data.parameterMap = self.HFEngine.parameterMap self._m_musUniqueCN = copy(self._musUniqueCN) musUniqueCNAux = np.zeros((self.S, self.npar), dtype = self._musUniqueCN.dtype) musUniqueCNAux[:, self.directionPivot[0]] = self._musUniqueCN(0) self._musUniqueCN = self.checkParameterList(musUniqueCNAux) self._m_derIdxs = copy(self._derIdxs) for j in range(len(self._derIdxs)): for l in range(len(self._derIdxs[j])): derjl = self._derIdxs[j][l][0] self._derIdxs[j][l] = [0] * self.npar self._derIdxs[j][l][self.directionPivot[0]] = derjl self.trainedModel.data.Q._dirPivot = self.directionPivot[0] self.trainedModel.data.P._dirPivot = self.directionPivot[0] else: self._temporaryPivot = 1 self.trainedModel.data.mu0 = self.checkParameterListPivot( self.mu0(self.directionPivot)) self.trainedModel.data.scaleFactor = self.scaleFactor self.trainedModel.data.parameterMap = { "F": [self.HFEngine.parameterMap["F"][self.directionPivot[0]]], "B": [self.HFEngine.parameterMap["B"][self.directionPivot[0]]]} self._musUniqueCN = copy(self._m_musUniqueCN) self._derIdxs = copy(self._m_derIdxs) del self._m_musUniqueCN, self._m_derIdxs del self.trainedModel.data.Q._dirPivot del self.trainedModel.data.P._dirPivot self.trainedModel.data.npar = self.npar def errorEstimator(self, mus:Np1D, return_max : bool = False) -> Np1D: """Standard residual-based error estimator.""" - self._marginalizeMiscellanea(True) setupOK = self.setupApproxLocal() - self._marginalizeMiscellanea(False) if setupOK > 0: err = np.empty(len(mus)) err[:] = np.nan if not return_max: return err return err, [- setupOK], np.nan self._marginalizeTrainedModel(True) errRes = super().errorEstimator(mus, return_max) self._marginalizeTrainedModel(False) return errRes def _preliminaryTraining(self): """Initialize starting snapshots of solution map.""" RROMPyAssert(self._mode, message = "Cannot start greedy algorithm.") self._S = self._setSampleBatch(self.S) self.resetSamples() self.samplingEngine.scaleFactor = self.scaleFactorDer musPivot = self.trainSetGenerator.generatePoints(self.S) while len(musPivot) > self.S: musPivot.pop() muTestPivot = self.samplerPivot.generatePoints(self.nTestPoints, False) idxPop = pruneSamples(self.mapParameterListPivot(muTestPivot), self.mapParameterListPivot(musPivot), 1e-10 * self.scaleFactorPivot[0]) self._mus = emptyParameterList() self.mus.reset((self.S, self.npar + len(self.musMargLoc))) muTestBase = emptyParameterList() muTestBase.reset((len(muTestPivot), self.npar + len(self.musMargLoc))) for k in range(self.S): muk = np.empty_like(self.mus[0]) muk[self.directionPivot] = musPivot[k] muk[self.directionMarginal] = self.musMargLoc self.mus[k] = muk for k in range(len(muTestPivot)): muk = np.empty_like(muTestBase[0]) muk[self.directionPivot] = muTestPivot[k] muk[self.directionMarginal] = self.musMargLoc muTestBase[k] = muk muTestBase.pop(idxPop) muLast = copy(self.mus[-1]) self.mus.pop() if len(self.mus) > 0: vbMng(self, "MAIN", ("Adding first {} sample point{} at {} to training " "set.").format(self.S - 1, "" + "s" * (self.S > 2), self.mus), 3) self.samplingEngine.iterSample(self.mus) self._S = len(self.mus) self._approxParameters["S"] = self.S self.muTest = parameterList(muTestBase) self.muTest.append(muLast) self.M, self.N = ("AUTO",) * 2 + def setupApproxLocal(self) -> int: + """Compute rational interpolant.""" + self._marginalizeMiscellanea(True) + setupOK = super().setupApproxLocal() + self._marginalizeMiscellanea(False) + return setupOK + def setupApprox(self, *args, **kwargs) -> int: """Compute rational interpolant.""" if self.checkComputedApprox(): return -1 RROMPyAssert(self._mode, message = "Cannot setup approximant.") vbMng(self, "INIT", "Setting up {}.". format(self.name()), 5) self.computeScaleFactor() self._musMarginal = self.samplerMarginal.generatePoints(self.SMarginal) while len(self.musMarginal) > self.SMarginal: self.musMarginal.pop() S0 = copy(self.S) idx, sizes = indicesScatter(len(self.musMarginal), return_sizes = True) pMat, Ps, Qs, mus = None, [], [], None req, emptyCores = [], np.where(np.logical_not(sizes))[0] if len(idx) == 0: vbMng(self, "MAIN", "Idling.", 25) if self.storeAllSamples: self.storeSamples() pL, pT, mT = recv(source = 0, tag = poolRank()) pMat = np.empty((pL, 0), dtype = pT) mus = np.empty((0, self.mu0.shape[1]), dtype = mT) else: _scaleFactorOldPivot = copy(self.scaleFactor) self.scaleFactor = self.scaleFactorPivot self._temporaryPivot = 1 for i in idx: self.musMargLoc = self.musMarginal[i] vbMng(self, "MAIN", "Building marginal model no. {} at {}.".format(i + 1, self.musMargLoc), 5) self.samplingEngine.resetHistory() self.trainedModel = None self.verbosity -= 5 self.samplingEngine.verbosity -= 5 super().setupApprox(*args, **kwargs) self.verbosity += 5 self.samplingEngine.verbosity += 5 if self.storeAllSamples: self.storeSamples(i) if pMat is None: mus = copy(self.samplingEngine.mus.data) pMat = copy(self.samplingEngine.projectionMatrix) if i == 0: for dest in emptyCores: req += [isend((len(pMat), pMat.dtype, mus.dtype), dest = dest, tag = dest)] else: mus = np.vstack((mus, self.samplingEngine.mus.data)) pMat = np.hstack((pMat, self.samplingEngine.projectionMatrix)) Ps += [copy(self.trainedModel.data.P)] Qs += [copy(self.trainedModel.data.Q)] self._S = S0 del self._temporaryPivot, self.musMargLoc self.scaleFactor = _scaleFactorOldPivot for r in req: r.wait() pMat, Ps, Qs, mus, nsamples = gatherPivotedApproximant(pMat, Ps, Qs, mus, sizes, self.polybasis) self._mus = self.checkParameterList(mus) Psupp = np.append(0, np.cumsum(nsamples)) self._setupTrainedModel(pMat, forceNew = True) self.trainedModel.data.Qs, self.trainedModel.data.Ps = Qs, Ps self.trainedModel.data.Psupp = list(Psupp[: -1]) self._poleMatching() self._finalizeMarginalization() vbMng(self, "DEL", "Done setting up approximant.", 5) return 0 class RationalInterpolantGreedyPivotedNoMatch( RationalInterpolantGreedyPivotedBase, GenericPivotedApproximantNoMatch): """ ROM pivoted rational interpolant (without pole matching) computation for parametric problems. Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. directionPivot(optional): Pivot components. Defaults to [0]. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': whether to compute POD of snapshots; defaults to True; - 'scaleFactorDer': scaling factors for derivative computation; defaults to 'AUTO'; - 'S': total number of pivot samples current approximant relies upon; - 'samplerPivot': pivot sample point generator; - 'SMarginal': total number of marginal samples current approximant relies upon; - 'samplerMarginal': marginal sample point generator; - 'polybasis': type of polynomial basis for pivot interpolation; defaults to 'MONOMIAL'; - 'greedyTol': uniform error tolerance for greedy algorithm; defaults to 1e-2; - 'collinearityTol': collinearity tolerance for greedy algorithm; defaults to 0.; - 'maxIter': maximum number of greedy steps; defaults to 1e2; - 'nTestPoints': number of test points; defaults to 5e2; - 'trainSetGenerator': training sample points generator; defaults to sampler; - 'errorEstimatorKind': kind of error estimator; available values include 'AFFINE', 'DISCREPANCY', 'LOOK_AHEAD', 'LOOK_AHEAD_RES', and 'NONE'; defaults to 'NONE'; - 'radialDirectionalWeightsMarginal': radial basis weights for marginal interpolant; defaults to 1; - 'functionalSolve': strategy for minimization of denominator functional; allowed values include 'NORM', 'DOMINANT', 'NODAL', 'LOEWNER', and 'BARYCENTRIC'; defaults to 'NORM'; - 'interpRcond': tolerance for pivot interpolation; defaults to None; - 'robustTol': tolerance for robust rational denominator management; defaults to 0. Defaults to empty dict. approx_state(optional): Whether to approximate state. Defaults to False. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. mu0: Default parameter. directionPivot: Pivot components. mus: Array of snapshot parameters. musMarginal: Array of marginal snapshot parameters. approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList. parameterListSoft: Recognized keys of soft approximant parameters: - 'POD': whether to compute POD of snapshots; - 'scaleFactorDer': scaling factors for derivative computation; - 'polybasis': type of polynomial basis for pivot interpolation; - 'greedyTol': uniform error tolerance for greedy algorithm; - 'collinearityTol': collinearity tolerance for greedy algorithm; - 'maxIter': maximum number of greedy steps; - 'nTestPoints': number of test points; - 'trainSetGenerator': training sample points generator; - 'errorEstimatorKind': kind of error estimator; - 'radialDirectionalWeightsMarginal': radial basis weights for marginal interpolant; - 'functionalSolve': strategy for minimization of denominator functional; - 'interpRcond': tolerance for pivot interpolation; - 'robustTol': tolerance for robust rational denominator management. parameterListCritical: Recognized keys of critical approximant parameters: - 'S': total number of pivot samples current approximant relies upon; - 'samplerPivot': pivot sample point generator; - 'SMarginal': total number of marginal samples current approximant relies upon; - 'samplerMarginal': marginal sample point generator. approx_state: Whether to approximate state. verbosity: Verbosity level. POD: Whether to compute POD of snapshots. scaleFactorDer: Scaling factors for derivative computation. S: Total number of pivot samples current approximant relies upon. samplerPivot: Pivot sample point generator. SMarginal: Total number of marginal samples current approximant relies upon. samplerMarginal: Marginal sample point generator. polybasis: Type of polynomial basis for pivot interpolation. greedyTol: uniform error tolerance for greedy algorithm. collinearityTol: Collinearity tolerance for greedy algorithm. maxIter: maximum number of greedy steps. nTestPoints: number of starting training points. trainSetGenerator: training sample points generator. errorEstimatorKind: kind of error estimator. radialDirectionalWeightsMarginal: Radial basis weights for marginal interpolant. functionalSolve: Strategy for minimization of denominator functional. interpRcond: Tolerance for pivot interpolation. robustTol: Tolerance for robust rational denominator management. muBounds: list of bounds for pivot parameter values. muBoundsMarginal: list of bounds for marginal parameter values. samplingEngine: Sampling engine. uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as sampleList. lastSolvedHF: Parameter(s) corresponding to last computed high fidelity solution(s) as parameterList. uApproxReduced: Reduced approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApproxReduced: Parameter(s) corresponding to last computed reduced approximate solution(s) as parameterList. uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApprox: Parameter(s) corresponding to last computed approximate solution(s) as parameterList. Q: Numpy 1D vector containing complex coefficients of approximant denominator. P: Numpy 2D vector whose columns are FE dofs of coefficients of approximant numerator. """ class RationalInterpolantGreedyPivoted(RationalInterpolantGreedyPivotedBase, GenericPivotedApproximant): """ ROM pivoted rational interpolant (with pole matching) computation for parametric problems. Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. directionPivot(optional): Pivot components. Defaults to [0]. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': whether to compute POD of snapshots; defaults to True; - 'scaleFactorDer': scaling factors for derivative computation; defaults to 'AUTO'; - 'matchingWeight': weight for pole matching optimization; defaults to 1; - 'matchingMode': mode for pole matching optimization; allowed values include 'NONE' and 'SHIFT'; defaults to 'NONE'; - 'sharedRatio': required ratio of marginal points to share resonance; defaults to 1.; - 'S': total number of pivot samples current approximant relies upon; - 'samplerPivot': pivot sample point generator; - 'SMarginal': total number of marginal samples current approximant relies upon; - 'samplerMarginal': marginal sample point generator; - 'polybasis': type of polynomial basis for pivot interpolation; defaults to 'MONOMIAL'; - 'polybasisMarginal': type of polynomial basis for marginal interpolation; allowed values include 'MONOMIAL_*', 'CHEBYSHEV_*', 'LEGENDRE_*', 'NEARESTNEIGHBOR', and 'PIECEWISE_LINEAR_*'; defaults to 'MONOMIAL'; - 'paramsMarginal': dictionary of parameters for marginal interpolation; include: . 'MMarginal': degree of marginal interpolant; defaults to 'AUTO', i.e. maximum allowed; not for 'NEARESTNEIGHBOR' or 'PIECEWISE_LINEAR_*'; . 'nNeighborsMarginal': number of marginal nearest neighbors; defaults to 1; only for 'NEARESTNEIGHBOR'; . 'polydegreetypeMarginal': type of polynomial degree for marginal; defaults to 'TOTAL'; not for 'NEARESTNEIGHBOR' or 'PIECEWISE_LINEAR_*'; . 'interpRcondMarginal': tolerance for marginal interpolation; defaults to None; not for 'NEARESTNEIGHBOR' or 'PIECEWISE_LINEAR_*'; . 'radialDirectionalWeightsMarginalAdapt': bounds for adaptive rescaling of marginal radial basis weights; only for radial basis. - 'greedyTol': uniform error tolerance for greedy algorithm; defaults to 1e-2; - 'collinearityTol': collinearity tolerance for greedy algorithm; defaults to 0.; - 'maxIter': maximum number of greedy steps; defaults to 1e2; - 'nTestPoints': number of test points; defaults to 5e2; - 'trainSetGenerator': training sample points generator; defaults to sampler; - 'errorEstimatorKind': kind of error estimator; available values include 'AFFINE', 'DISCREPANCY', 'LOOK_AHEAD', 'LOOK_AHEAD_RES', and 'NONE'; defaults to 'NONE'; - 'radialDirectionalWeightsMarginal': radial basis weights for marginal interpolant; defaults to 1; - 'functionalSolve': strategy for minimization of denominator functional; allowed values include 'NORM', 'DOMINANT', 'NODAL', 'LOEWNER', and 'BARYCENTRIC'; defaults to 'NORM'; - 'interpRcond': tolerance for pivot interpolation; defaults to None; - 'robustTol': tolerance for robust rational denominator management; defaults to 0. Defaults to empty dict. approx_state(optional): Whether to approximate state. Defaults to False. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. mu0: Default parameter. directionPivot: Pivot components. mus: Array of snapshot parameters. musMarginal: Array of marginal snapshot parameters. approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList. parameterListSoft: Recognized keys of soft approximant parameters: - 'POD': whether to compute POD of snapshots; - 'scaleFactorDer': scaling factors for derivative computation; - 'matchingWeight': weight for pole matching optimization; - 'matchingMode': mode for pole matching optimization; - 'sharedRatio': required ratio of marginal points to share resonance; - 'polybasis': type of polynomial basis for pivot interpolation; - 'polybasisMarginal': type of polynomial basis for marginal interpolation; - 'paramsMarginal': dictionary of parameters for marginal interpolation; include: . 'MMarginal': degree of marginal interpolant; . 'nNeighborsMarginal': number of marginal nearest neighbors; . 'polydegreetypeMarginal': type of polynomial degree for marginal; . 'interpRcondMarginal': tolerance for marginal interpolation; . 'radialDirectionalWeightsMarginalAdapt': bounds for adaptive rescaling of marginal radial basis weights. - 'greedyTol': uniform error tolerance for greedy algorithm; - 'collinearityTol': collinearity tolerance for greedy algorithm; - 'maxIter': maximum number of greedy steps; - 'nTestPoints': number of test points; - 'trainSetGenerator': training sample points generator; - 'errorEstimatorKind': kind of error estimator; - 'radialDirectionalWeightsMarginal': radial basis weights for marginal interpolant; - 'functionalSolve': strategy for minimization of denominator functional; - 'interpRcond': tolerance for pivot interpolation; - 'robustTol': tolerance for robust rational denominator management. parameterListCritical: Recognized keys of critical approximant parameters: - 'S': total number of pivot samples current approximant relies upon; - 'samplerPivot': pivot sample point generator; - 'SMarginal': total number of marginal samples current approximant relies upon; - 'samplerMarginal': marginal sample point generator. approx_state: Whether to approximate state. verbosity: Verbosity level. POD: Whether to compute POD of snapshots. scaleFactorDer: Scaling factors for derivative computation. matchingWeight: Weight for pole matching optimization. matchingMode: Mode for pole matching optimization. sharedRatio: Required ratio of marginal points to share resonance. S: Total number of pivot samples current approximant relies upon. samplerPivot: Pivot sample point generator. SMarginal: Total number of marginal samples current approximant relies upon. samplerMarginal: Marginal sample point generator. polybasis: Type of polynomial basis for pivot interpolation. polybasisMarginal: Type of polynomial basis for marginal interpolation. paramsMarginal: Dictionary of parameters for marginal interpolation. greedyTol: uniform error tolerance for greedy algorithm. collinearityTol: Collinearity tolerance for greedy algorithm. maxIter: maximum number of greedy steps. nTestPoints: number of starting training points. trainSetGenerator: training sample points generator. errorEstimatorKind: kind of error estimator. radialDirectionalWeightsMarginal: Radial basis weights for marginal interpolant. functionalSolve: Strategy for minimization of denominator functional. interpRcond: Tolerance for pivot interpolation. robustTol: Tolerance for robust rational denominator management. muBounds: list of bounds for pivot parameter values. muBoundsMarginal: list of bounds for marginal parameter values. samplingEngine: Sampling engine. uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as sampleList. lastSolvedHF: Parameter(s) corresponding to last computed high fidelity solution(s) as parameterList. uApproxReduced: Reduced approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApproxReduced: Parameter(s) corresponding to last computed reduced approximate solution(s) as parameterList. uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApprox: Parameter(s) corresponding to last computed approximate solution(s) as parameterList. Q: Numpy 1D vector containing complex coefficients of approximant denominator. P: Numpy 2D vector whose columns are FE dofs of coefficients of approximant numerator. """ diff --git a/rrompy/reduction_methods/pivoted/rational_interpolant_pivoted.py b/rrompy/reduction_methods/pivoted/rational_interpolant_pivoted.py index 2e55405..ee09553 100644 --- a/rrompy/reduction_methods/pivoted/rational_interpolant_pivoted.py +++ b/rrompy/reduction_methods/pivoted/rational_interpolant_pivoted.py @@ -1,455 +1,456 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # -from copy import deepcopy as copy import numpy as np +from collections.abc import Iterable +from copy import deepcopy as copy from .generic_pivoted_approximant import (GenericPivotedApproximantBase, GenericPivotedApproximantNoMatch, GenericPivotedApproximant) from .gather_pivoted_approximant import gatherPivotedApproximant from rrompy.reduction_methods.standard.rational_interpolant import ( RationalInterpolant) from rrompy.utilities.base import verbosityManager as vbMng from rrompy.utilities.numerical.hash_derivative import nextDerivativeIndices from rrompy.utilities.exception_manager import RROMPyAssert, RROMPyWarning from rrompy.parameter import emptyParameterList from rrompy.utilities.parallel import poolRank, indicesScatter, isend, recv __all__ = ['RationalInterpolantPivotedNoMatch', 'RationalInterpolantPivoted'] class RationalInterpolantPivotedBase(GenericPivotedApproximantBase, RationalInterpolant): def __init__(self, *args, **kwargs): self._preInit() self._addParametersToList(toBeExcluded = ["polydegreetype"]) super().__init__(*args, **kwargs) self._ignoreResidues = self.nparPivot > 1 self._postInit() @property def scaleFactorDer(self): """Value of scaleFactorDer.""" if self._scaleFactorDer == "NONE": return 1. if self._scaleFactorDer == "AUTO": return self.scaleFactorPivot return self._scaleFactorDer @scaleFactorDer.setter def scaleFactorDer(self, scaleFactorDer): if isinstance(scaleFactorDer, (str,)): scaleFactorDer = scaleFactorDer.upper() - elif hasattr(scaleFactorDer, "__len__"): + elif isinstance(scaleFactorDer, Iterable): scaleFactorDer = list(scaleFactorDer) self._scaleFactorDer = scaleFactorDer self._approxParameters["scaleFactorDer"] = self._scaleFactorDer @property def polydegreetype(self): """Value of polydegreetype.""" return "TOTAL" @polydegreetype.setter def polydegreetype(self, polydegreetype): RROMPyWarning(("polydegreetype is used just to simplify inheritance, " "and its value cannot be changed from 'TOTAL'.")) def _setupInterpolationIndices(self): """Setup parameters for polyvander.""" RROMPyAssert(self._mode, message = "Cannot setup interpolation indices.") if (self._musUniqueCN is None or len(self._reorder) != len(self.musPivot)): try: muPC = self.trainedModel.centerNormalizePivot(self.musPivot) except: muPC = self.trainedModel.centerNormalize(self.musPivot) self._musUniqueCN, musIdxsTo, musIdxs, musCount = (muPC.unique( return_index = True, return_inverse = True, return_counts = True)) self._musUnique = self.musPivot[musIdxsTo] self._derIdxs = [None] * len(self._musUniqueCN) self._reorder = np.empty(len(musIdxs), dtype = int) filled = 0 for j, cnt in enumerate(musCount): self._derIdxs[j] = nextDerivativeIndices([], self.nparPivot, cnt) jIdx = np.nonzero(musIdxs == j)[0] self._reorder[jIdx] = np.arange(filled, filled + cnt) filled += cnt def setupApprox(self) -> int: """Compute rational interpolant.""" if self.checkComputedApprox(): return -1 RROMPyAssert(self._mode, message = "Cannot setup approximant.") vbMng(self, "INIT", "Setting up {}.". format(self.name()), 5) self.computeScaleFactor() self.resetSamples() self.samplingEngine.scaleFactor = self.scaleFactorDer self.musPivot = self.samplerPivot.generatePoints(self.S) while len(self.musPivot) > self.S: self.musPivot.pop() self._musMarginal = self.samplerMarginal.generatePoints(self.SMarginal) while len(self.musMarginal) > self.SMarginal: self.musMarginal.pop() self._mus = emptyParameterList() self.mus.reset((self.S * self.SMarginal, self.HFEngine.npar)) for j, muMarg in enumerate(self.musMarginal): for k in range(j * self.S, (j + 1) * self.S): muk = np.empty_like(self.mus[0]) muk[self.directionPivot] = self.musPivot[k - j * self.S] muk[self.directionMarginal] = muMarg self.mus[k] = muk N0 = copy(self.N) self._setupTrainedModel(np.zeros((0, 0)), forceNew = True) idx, sizes = indicesScatter(len(self.musMarginal), return_sizes = True) pMat, Ps, Qs = None, [], [] req, emptyCores = [], np.where(np.logical_not(sizes))[0] if len(idx) == 0: vbMng(self, "MAIN", "Idling.", 30) if self.storeAllSamples: self.storeSamples() pL, pT = recv(source = 0, tag = poolRank()) pMat = np.empty((pL, 0), dtype = pT) else: _scaleFactorOldPivot = copy(self.scaleFactor) self.scaleFactor = self.scaleFactorPivot self._temporaryPivot = 1 for i in idx: vbMng(self, "MAIN", "Building marginal model no. {} at {}.".format(i + 1, self.musMarginal[i]), 5) vbMng(self, "INIT", "Starting computation of snapshots.", 10) self.samplingEngine.resetHistory() self.samplingEngine.iterSample( self.mus[self.S * i : self.S * (i + 1)]) vbMng(self, "DEL", "Done computing snapshots.", 10) self.verbosity -= 5 self.samplingEngine.verbosity -= 5 self._setupRational(self._setupDenominator()) self.verbosity += 5 self.samplingEngine.verbosity += 5 if self.storeAllSamples: self.storeSamples(i) if pMat is None: pMat = copy(self.samplingEngine.projectionMatrix) if i == 0: for dest in emptyCores: req += [isend((len(pMat), pMat.dtype), dest = dest, tag = dest)] else: pMat = np.hstack((pMat, self.samplingEngine.projectionMatrix)) Ps += [copy(self.trainedModel.data.P)] Qs += [copy(self.trainedModel.data.Q)] del self.trainedModel.data.Q, self.trainedModel.data.P self.N = N0 del self._temporaryPivot self.scaleFactor = _scaleFactorOldPivot for r in req: r.wait() pMat, Ps, Qs, _, _ = gatherPivotedApproximant(pMat, Ps, Qs, self.mus.data, sizes, self.polybasis, False) self._setupTrainedModel(pMat) self.trainedModel.data.Qs, self.trainedModel.data.Ps = Qs, Ps Psupp = np.arange(0, len(self.musMarginal) * self.S, self.S) self.trainedModel.data.Psupp = list(Psupp) self._poleMatching() self._finalizeMarginalization() vbMng(self, "DEL", "Done setting up approximant.", 5) return 0 class RationalInterpolantPivotedNoMatch(RationalInterpolantPivotedBase, GenericPivotedApproximantNoMatch): """ ROM pivoted rational interpolant (without pole matching) computation for parametric problems. Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. directionPivot(optional): Pivot components. Defaults to [0]. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': whether to compute POD of snapshots; defaults to True; - 'scaleFactorDer': scaling factors for derivative computation; defaults to 'AUTO'; - 'S': total number of pivot samples current approximant relies upon; - 'samplerPivot': pivot sample point generator; - 'SMarginal': total number of marginal samples current approximant relies upon; - 'samplerMarginal': marginal sample point generator; - 'polybasis': type of polynomial basis for pivot interpolation; defaults to 'MONOMIAL'; - 'M': degree of rational interpolant numerator; defaults to 'AUTO', i.e. maximum allowed; - 'N': degree of rational interpolant denominator; defaults to 'AUTO', i.e. maximum allowed; - 'radialDirectionalWeights': radial basis weights for pivot numerator; defaults to 1; - 'radialDirectionalWeightsAdapt': bounds for adaptive rescaling of radial basis weights; defaults to [-1, -1]; - 'radialDirectionalWeightsMarginal': radial basis weights for marginal interpolant; defaults to 1; - 'functionalSolve': strategy for minimization of denominator functional; allowed values include 'NORM', 'DOMINANT', 'NODAL', 'LOEWNER', and 'BARYCENTRIC'; defaults to 'NORM'; - 'interpRcond': tolerance for pivot interpolation; defaults to None; - 'robustTol': tolerance for robust rational denominator management; defaults to 0. Defaults to empty dict. approx_state(optional): Whether to approximate state. Defaults to False. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. mu0: Default parameter. directionPivot: Pivot components. mus: Array of snapshot parameters. musPivot: Array of pivot snapshot parameters. musMarginal: Array of marginal snapshot parameters. approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList. parameterListSoft: Recognized keys of soft approximant parameters: - 'POD': whether to compute POD of snapshots; - 'scaleFactorDer': scaling factors for derivative computation; - 'polybasis': type of polynomial basis for pivot interpolation; - 'M': degree of rational interpolant numerator; - 'N': degree of rational interpolant denominator; - 'radialDirectionalWeights': radial basis weights for pivot numerator; - 'radialDirectionalWeightsAdapt': bounds for adaptive rescaling of radial basis weights; - 'radialDirectionalWeightsMarginal': radial basis weights for marginal interpolant; - 'functionalSolve': strategy for minimization of denominator functional; - 'interpRcond': tolerance for pivot interpolation; - 'robustTol': tolerance for robust rational denominator management. parameterListCritical: Recognized keys of critical approximant parameters: - 'S': total number of pivot samples current approximant relies upon; - 'samplerPivot': pivot sample point generator; - 'SMarginal': total number of marginal samples current approximant relies upon; - 'samplerMarginal': marginal sample point generator. approx_state: Whether to approximate state. verbosity: Verbosity level. POD: Whether to compute POD of snapshots. scaleFactorDer: Scaling factors for derivative computation. S: Total number of pivot samples current approximant relies upon. samplerPivot: Pivot sample point generator. SMarginal: Total number of marginal samples current approximant relies upon. samplerMarginal: Marginal sample point generator. polybasis: Type of polynomial basis for pivot interpolation. M: Numerator degree of approximant. N: Denominator degree of approximant. radialDirectionalWeights: Radial basis weights for pivot numerator. radialDirectionalWeightsAdapt: Bounds for adaptive rescaling of radial basis weights. radialDirectionalWeightsMarginal: Radial basis weights for marginal interpolant. functionalSolve: Strategy for minimization of denominator functional. interpRcond: Tolerance for pivot interpolation. robustTol: Tolerance for robust rational denominator management. muBounds: list of bounds for pivot parameter values. muBoundsMarginal: list of bounds for marginal parameter values. samplingEngine: Sampling engine. uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as sampleList. lastSolvedHF: Parameter(s) corresponding to last computed high fidelity solution(s) as parameterList. uApproxReduced: Reduced approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApproxReduced: Parameter(s) corresponding to last computed reduced approximate solution(s) as parameterList. uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApprox: Parameter(s) corresponding to last computed approximate solution(s) as parameterList. Q: Numpy 1D vector containing complex coefficients of approximant denominator. P: Numpy 2D vector whose columns are FE dofs of coefficients of approximant numerator. """ class RationalInterpolantPivoted(RationalInterpolantPivotedBase, GenericPivotedApproximant): """ ROM pivoted rational interpolant (with pole matching) computation for parametric problems. Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. directionPivot(optional): Pivot components. Defaults to [0]. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': whether to compute POD of snapshots; defaults to True; - 'scaleFactorDer': scaling factors for derivative computation; defaults to 'AUTO'; - 'matchingWeight': weight for pole matching optimization; defaults to 1; - 'matchingMode': mode for pole matching optimization; allowed values include 'NONE' and 'SHIFT'; defaults to 'NONE'; - 'sharedRatio': required ratio of marginal points to share resonance; defaults to 1.; - 'S': total number of pivot samples current approximant relies upon; - 'samplerPivot': pivot sample point generator; - 'SMarginal': total number of marginal samples current approximant relies upon; - 'samplerMarginal': marginal sample point generator; - 'polybasis': type of polynomial basis for pivot interpolation; defaults to 'MONOMIAL'; - 'polybasisMarginal': type of polynomial basis for marginal interpolation; allowed values include 'MONOMIAL_*', 'CHEBYSHEV_*', 'LEGENDRE_*', 'NEARESTNEIGHBOR', and 'PIECEWISE_LINEAR_*'; defaults to 'MONOMIAL'; - 'paramsMarginal': dictionary of parameters for marginal interpolation; include: . 'MMarginal': degree of marginal interpolant; defaults to 'AUTO', i.e. maximum allowed; not for 'NEARESTNEIGHBOR' or 'PIECEWISE_LINEAR_*'; . 'nNeighborsMarginal': number of marginal nearest neighbors; defaults to 1; only for 'NEARESTNEIGHBOR'; . 'polydegreetypeMarginal': type of polynomial degree for marginal; defaults to 'TOTAL'; not for 'NEARESTNEIGHBOR' or 'PIECEWISE_LINEAR_*'; . 'interpRcondMarginal': tolerance for marginal interpolation; defaults to None; not for 'NEARESTNEIGHBOR' or 'PIECEWISE_LINEAR_*'; . 'radialDirectionalWeightsMarginalAdapt': bounds for adaptive rescaling of marginal radial basis weights; only for radial basis. - 'M': degree of rational interpolant numerator; defaults to 'AUTO', i.e. maximum allowed; - 'N': degree of rational interpolant denominator; defaults to 'AUTO', i.e. maximum allowed; - 'radialDirectionalWeights': radial basis weights for pivot numerator; defaults to 1; - 'radialDirectionalWeightsAdapt': bounds for adaptive rescaling of radial basis weights; defaults to [-1, -1]; - 'radialDirectionalWeightsMarginal': radial basis weights for marginal interpolant; defaults to 1; - 'functionalSolve': strategy for minimization of denominator functional; allowed values include 'NORM', 'DOMINANT', 'NODAL', 'LOEWNER', and 'BARYCENTRIC'; defaults to 'NORM'; - 'interpRcond': tolerance for pivot interpolation; defaults to None; - 'robustTol': tolerance for robust rational denominator management; defaults to 0. Defaults to empty dict. approx_state(optional): Whether to approximate state. Defaults to False. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. mu0: Default parameter. directionPivot: Pivot components. mus: Array of snapshot parameters. musPivot: Array of pivot snapshot parameters. musMarginal: Array of marginal snapshot parameters. approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList. parameterListSoft: Recognized keys of soft approximant parameters: - 'POD': whether to compute POD of snapshots; - 'scaleFactorDer': scaling factors for derivative computation; - 'matchingWeight': weight for pole matching optimization; - 'matchingMode': mode for pole matching optimization; - 'sharedRatio': required ratio of marginal points to share resonance; - 'polybasis': type of polynomial basis for pivot interpolation; - 'polybasisMarginal': type of polynomial basis for marginal interpolation; - 'paramsMarginal': dictionary of parameters for marginal interpolation; include: . 'MMarginal': degree of marginal interpolant; . 'nNeighborsMarginal': number of marginal nearest neighbors; . 'polydegreetypeMarginal': type of polynomial degree for marginal; . 'interpRcondMarginal': tolerance for marginal interpolation; . 'radialDirectionalWeightsMarginalAdapt': bounds for adaptive rescaling of marginal radial basis weights. - 'M': degree of rational interpolant numerator; - 'N': degree of rational interpolant denominator; - 'radialDirectionalWeights': radial basis weights for pivot numerator; - 'radialDirectionalWeightsAdapt': bounds for adaptive rescaling of radial basis weights; - 'radialDirectionalWeightsMarginal': radial basis weights for marginal interpolant; - 'functionalSolve': strategy for minimization of denominator functional; - 'interpRcond': tolerance for pivot interpolation; - 'robustTol': tolerance for robust rational denominator management. parameterListCritical: Recognized keys of critical approximant parameters: - 'S': total number of pivot samples current approximant relies upon; - 'samplerPivot': pivot sample point generator; - 'SMarginal': total number of marginal samples current approximant relies upon; - 'samplerMarginal': marginal sample point generator. approx_state: Whether to approximate state. verbosity: Verbosity level. POD: Whether to compute POD of snapshots. scaleFactorDer: Scaling factors for derivative computation. matchingWeight: Weight for pole matching optimization. matchingMode: Mode for pole matching optimization. sharedRatio: Required ratio of marginal points to share resonance. S: Total number of pivot samples current approximant relies upon. samplerPivot: Pivot sample point generator. SMarginal: Total number of marginal samples current approximant relies upon. samplerMarginal: Marginal sample point generator. polybasis: Type of polynomial basis for pivot interpolation. polybasisMarginal: Type of polynomial basis for marginal interpolation. paramsMarginal: Dictionary of parameters for marginal interpolation. M: Numerator degree of approximant. N: Denominator degree of approximant. radialDirectionalWeights: Radial basis weights for pivot numerator. radialDirectionalWeightsAdapt: Bounds for adaptive rescaling of radial basis weights. radialDirectionalWeightsMarginal: Radial basis weights for marginal interpolant. functionalSolve: Strategy for minimization of denominator functional. interpRcond: Tolerance for pivot interpolation. robustTol: Tolerance for robust rational denominator management. muBounds: list of bounds for pivot parameter values. muBoundsMarginal: list of bounds for marginal parameter values. samplingEngine: Sampling engine. uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as sampleList. lastSolvedHF: Parameter(s) corresponding to last computed high fidelity solution(s) as parameterList. uApproxReduced: Reduced approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApproxReduced: Parameter(s) corresponding to last computed reduced approximate solution(s) as parameterList. uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApprox: Parameter(s) corresponding to last computed approximate solution(s) as parameterList. Q: Numpy 1D vector containing complex coefficients of approximant denominator. P: Numpy 2D vector whose columns are FE dofs of coefficients of approximant numerator. """ diff --git a/rrompy/reduction_methods/pivoted/trained_model/trained_model_pivoted_rational_nomatch.py b/rrompy/reduction_methods/pivoted/trained_model/trained_model_pivoted_rational_nomatch.py index e3f3342..62b45be 100644 --- a/rrompy/reduction_methods/pivoted/trained_model/trained_model_pivoted_rational_nomatch.py +++ b/rrompy/reduction_methods/pivoted/trained_model/trained_model_pivoted_rational_nomatch.py @@ -1,327 +1,328 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import numpy as np -from copy import deepcopy as copy from scipy.special import factorial as fact +from collections.abc import Iterable +from copy import deepcopy as copy from itertools import combinations from rrompy.reduction_methods.standard.trained_model.trained_model_rational \ import TrainedModelRational from rrompy.utilities.base.types import (Np1D, Np2D, List, ListAny, paramVal, paramList, sampList) from rrompy.utilities.base import verbosityManager as vbMng, freepar as fp from rrompy.utilities.numerical import dot from rrompy.utilities.numerical.compress_matrix import compressMatrix from rrompy.utilities.poly_fitting.heaviside import (rational2heaviside, HeavisideInterpolator as HI) from rrompy.utilities.poly_fitting.nearest_neighbor import ( NearestNeighborInterpolator as NNI) from rrompy.utilities.exception_manager import (RROMPyException, RROMPyAssert, RROMPyWarning) from rrompy.parameter import checkParameterList from rrompy.sampling import sampleList, emptySampleList __all__ = ['TrainedModelPivotedRationalNoMatch'] class TrainedModelPivotedRationalNoMatch(TrainedModelRational): """ ROM approximant evaluation for pivoted approximants based on interpolation of rational approximants (without pole matching). Attributes: Data: dictionary with all that can be pickled. """ def checkParameterListPivot(self, mu:paramList, check_if_single : bool = False) -> paramList: return checkParameterList(mu, self.data.nparPivot, check_if_single) def checkParameterListMarginal(self, mu:paramList, check_if_single : bool = False) -> paramList: return checkParameterList(mu, self.data.nparMarginal, check_if_single) def compress(self, collapse : bool = False, tol : float = 0., *args, **kwargs): if not collapse and tol <= 0.: return RMat = self.data.projMat if not collapse: if hasattr(self.data, "_compressTol"): RROMPyWarning(("Recompressing already compressed model is " "ineffective. Aborting.")) return self.data.projMat, RMat, _ = compressMatrix(RMat, tol, *args, **kwargs) for obj, suppj in zip(self.data.HIs, self.data.Psupp): obj.postmultiplyTensorize(RMat.T[suppj : suppj + obj.shape[0]]) if hasattr(self, "_HIsExcl"): for obj, suppj in zip(self._HIsExcl, self.data.Psupp): obj.postmultiplyTensorize(RMat.T[suppj : suppj + obj.shape[0]]) if hasattr(self.data, "Ps"): for obj, suppj in zip(self.data.Ps, self.data.Psupp): obj.postmultiplyTensorize(RMat.T[suppj : suppj + obj.shape[0]]) if hasattr(self, "_PsExcl"): for obj, suppj in zip(self._PsExcl, self._PsuppExcl): obj.postmultiplyTensorize(RMat.T[suppj : suppj + obj.shape[0]]) if hasattr(self.data, "coeffsEff"): for j in range(len(self.data.coeffsEff)): self.data.coeffsEff[j] = dot(self.data.coeffsEff[j], RMat.T) if hasattr(self, "_HIsExcl") or hasattr(self, "_PsExcl"): self._PsuppExcl = [0] * len(self._PsuppExcl) self.data.Psupp = [0] * len(self.data.Psupp) super(TrainedModelRational, self).compress(collapse, tol) def centerNormalizePivot(self, mu : paramList = [], mu0 : paramVal = None) -> paramList: """ Compute normalized parameter to be plugged into approximant. Args: mu: Parameter(s) 1. mu0: Parameter(s) 2. If None, set to self.data.mu0Pivot. Returns: Normalized parameter. """ mu = self.checkParameterListPivot(mu) if mu0 is None: mu0 = self.checkParameterListPivot( self.data.mu0(0, self.data.directionPivot)) return (self.mapParameterList(mu, idx = self.data.directionPivot) - self.mapParameterList(mu0, idx = self.data.directionPivot) ) / [self.data.scaleFactor[x] for x in self.data.directionPivot] def setupMarginalInterp(self, interpPars:ListAny): self.data.marginalInterp = NNI() self.data.marginalInterp.setupByInterpolation(self.data.musMarginal, np.arange(len(self.data.musMarginal)), 1, *interpPars) def updateEffectiveSamples(self, exclude:List[int], *args, **kwargs): if hasattr(self, "_idxExcl"): for j, excl in enumerate(self._idxExcl): self.data.musMarginal.insert(self._musMExcl[j], excl) self.data.HIs.insert(excl, self._HIsExcl[j]) self.data.Ps.insert(excl, self._PsExcl[j]) self.data.Qs.insert(excl, self._QsExcl[j]) self.data.Psupp.insert(excl, self._PsuppExcl[j]) self._idxExcl, self._musMExcl = list(np.sort(exclude)), [] self._HIsExcl, self._PsExcl, self._QsExcl = [], [], [] self._PsuppExcl = [] for excl in self._idxExcl[::-1]: self._musMExcl = [self.data.musMarginal[excl]] + self._musMExcl self.data.musMarginal.pop(excl) self._HIsExcl = [self.data.HIs.pop(excl)] + self._HIsExcl self._PsExcl = [self.data.Ps.pop(excl)] + self._PsExcl self._QsExcl = [self.data.Qs.pop(excl)] + self._QsExcl self._PsuppExcl = [self.data.Psupp.pop(excl)] + self._PsuppExcl poles = [hi.poles for hi in self.data.HIs] coeffs = [hi.coeffs for hi in self.data.HIs] self.initializeFromLists(poles, coeffs, self.data.Psupp, self.data.HIs[0].polybasis, *args, **kwargs) def initializeFromLists(self, poles:ListAny, coeffs:ListAny, supps:ListAny, basis:str, *args, **kwargs): """Initialize Heaviside representation.""" self.data.HIs = [] for pls, cfs in zip(poles, coeffs): hsi = HI() hsi.poles = pls if len(cfs) == len(pls): cfs = np.pad(cfs, ((0, 1), (0, 0)), "constant") hsi.coeffs = cfs hsi.npar = 1 hsi.polybasis = basis self.data.HIs += [hsi] def initializeFromRational(self, *args, **kwargs): """Initialize Heaviside representation.""" RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters") poles, coeffs = [], [] for Q, P in zip(self.data.Qs, self.data.Ps): cfs, pls, basis = rational2heaviside(P, Q) poles += [pls] coeffs += [cfs] self.initializeFromLists(poles, coeffs, self.data.Psupp, basis, *args, **kwargs) def interpolateMarginalInterpolator(self, mu : paramList = []) -> ListAny: """Obtain interpolated approximant interpolator.""" mu = self.checkParameterListMarginal(mu) vbMng(self, "INIT", "Finding nearest neighbor to mu = {}.".format(mu), 95) his = [] intM = np.array(self.data.marginalInterp(mu), dtype = int) for j in range(len(mu)): i = intM[j] his += [HI()] his[-1].poles = copy(self.data.HIs[i].poles) his[-1].coeffs = copy(self.data.HIs[i].coeffs) his[-1].npar = 1 his[-1].polybasis = self.data.HIs[0].polybasis if not self.data._collapsed: his[-1].pad(self.data.Psupp[i], self.data.projMat.shape[1] - self.data.Psupp[i] - his[-1].shape[0]) vbMng(self, "DEL", "Done finding nearest neighbor.", 95) return his def interpolateMarginalPoles(self, mu : paramList = []) -> ListAny: """Obtain interpolated approximant poles.""" interps = self.interpolateMarginalInterpolator(mu) return [interp.poles for interp in interps] def interpolateMarginalCoeffs(self, mu : paramList = []) -> ListAny: """Obtain interpolated approximant poles.""" interps = self.interpolateMarginalInterpolator(mu) return [interp.coeffs for interp in interps] def getApproxReduced(self, mu : paramList = []) -> sampList: """ Evaluate reduced representation of approximant at arbitrary parameter. Args: mu: Target parameter. """ RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters") mu = self.checkParameterList(mu) if (not hasattr(self, "lastSolvedApproxReduced") or self.lastSolvedApproxReduced != mu): vbMng(self, "INIT", "Evaluating approximant at mu = {}.".format(mu), 12) muP = self.centerNormalizePivot(mu(self.data.directionPivot)) muM = mu(self.data.directionMarginal) his = self.interpolateMarginalInterpolator(muM) for i, (mP, hi) in enumerate(zip(muP, his)): uAppR = hi(mP)[:, 0] if i == 0: uApproxR = np.empty((len(uAppR), len(mu)), dtype = uAppR.dtype) uApproxR[:, i] = uAppR self.uApproxReduced = sampleList(uApproxR) vbMng(self, "DEL", "Done evaluating approximant.", 12) self.lastSolvedApproxReduced = mu return self.uApproxReduced def getPVal(self, mu : paramList = []) -> sampList: """ Evaluate rational numerator at arbitrary parameter. Args: mu: Target parameter. """ RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters") mu = self.checkParameterList(mu) p = emptySampleList() muP = self.centerNormalizePivot(mu(self.data.directionPivot)) muM = mu(self.data.directionMarginal) his = self.interpolateMarginalInterpolator(muM) for i, (mP, hi) in enumerate(zip(muP, his)): Pval = hi(mP) * np.prod(mP[0] - hi.poles) if i == 0: p.reset((len(Pval), len(mu)), dtype = Pval.dtype) p[i] = Pval return p def getQVal(self, mu:Np1D, der : List[int] = None, scl : Np1D = None) -> Np1D: """ Evaluate rational denominator at arbitrary parameter. Args: mu: Target parameter. der(optional): Derivatives to take before evaluation. """ RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters") mu = self.checkParameterList(mu) muP = self.centerNormalizePivot(mu(self.data.directionPivot)) muM = mu(self.data.directionMarginal) if der is None: derP, derM = 0, [0] else: derP = der[self.data.directionPivot[0]] derM = [der[x] for x in self.data.directionMarginal] if np.any(np.array(derM) != 0): raise RROMPyException(("Derivatives of Q with respect to marginal " "parameters not allowed.")) sclP = 1 if scl is None else scl[self.data.directionPivot[0]] derVal = np.zeros(len(mu), dtype = np.complex) pls = self.interpolateMarginalPoles(muM) for i, (mP, pl) in enumerate(zip(muP, pls)): N = len(pl) if derP == N: derVal[i] = 1. elif derP >= 0 and derP < N: plDist = muP[0] - pl for terms in combinations(np.arange(N), N - derP): derVal[i] += np.prod(plDist[list(terms)], axis = 1) return sclP ** derP * fact(derP) * derVal def getPoles(self, *args, **kwargs) -> Np1D: """ Obtain approximant poles. Returns: Numpy complex vector of poles. """ RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters") if len(args) + len(kwargs) > 1: raise RROMPyException(("Wrong number of parameters passed. " "Only 1 available.")) elif len(args) + len(kwargs) == 1: if len(args) == 1: mVals = args[0] else: mVals = kwargs["marginalVals"] - if not hasattr(mVals, "__len__"): mVals = [mVals] + if not isinstance(mVals, Iterable): mVals = [mVals] mVals = list(mVals) else: mVals = [fp] rDim = mVals.index(fp) if rDim < len(mVals) - 1 and fp in mVals[rDim + 1 :]: raise RROMPyException(("Exactly 1 'freepar' entry in " "marginalVals must be provided.")) if rDim != self.data.directionPivot[0]: raise RROMPyException(("'freepar' entry in marginalVals must " "coincide with pivot direction.")) mVals[rDim] = self.data.mu0(rDim)[0] mMarg = [mVals[j] for j in range(len(mVals)) if j != rDim] roots = (self.data.scaleFactor[rDim] * self.interpolateMarginalPoles(mMarg)[0]) return self.mapParameterList(self.mapParameterList(self.data.mu0(rDim), idx = [rDim])(0, 0) + roots, "B", [rDim])(0) def getResidues(self, *args, **kwargs) -> Np2D: """ Obtain approximant residues. Returns: Numpy matrix with residues as columns. """ pls = self.getPoles(*args, **kwargs) if len(args) == 1: mVals = args[0] elif len(args) == 0: mVals = [None] else: mVals = kwargs["marginalVals"] - if not hasattr(mVals, "__len__"): mVals = [mVals] + if not isinstance(mVals, Iterable): mVals = [mVals] mVals = list(mVals) rDim = mVals.index(fp) mMarg = [mVals[j] for j in range(len(mVals)) if j != rDim] res = self.interpolateMarginalCoeffs(mMarg)[0][: len(pls), :] if not self.data._collapsed: res = self.data.projMat.dot(res.T).T return pls, res diff --git a/rrompy/reduction_methods/standard/nearest_neighbor.py b/rrompy/reduction_methods/standard/nearest_neighbor.py index 932856c..9e38bbc 100644 --- a/rrompy/reduction_methods/standard/nearest_neighbor.py +++ b/rrompy/reduction_methods/standard/nearest_neighbor.py @@ -1,165 +1,166 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # -from copy import deepcopy as copy import numpy as np +from collections.abc import Iterable +from copy import deepcopy as copy from .generic_standard_approximant import GenericStandardApproximant from rrompy.utilities.base import verbosityManager as vbMng from rrompy.utilities.poly_fitting.nearest_neighbor import ( NearestNeighborInterpolator as NNI) from rrompy.utilities.exception_manager import RROMPyAssert __all__ = ['NearestNeighbor'] class NearestNeighbor(GenericStandardApproximant): """ ROM nearest neighbor approximant computation for parametric problems. Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': whether to compute POD of snapshots; defaults to True; - 'scaleFactorDer': scaling factors for derivative computation; defaults to 'AUTO'; - 'S': total number of samples current approximant relies upon; - 'sampler': sample point generator; - 'nNeighbors': number of nearest neighbors; defaults to 1; - 'radialDirectionalWeights': directional weights for computation of parameter distance; defaults to 1. Defaults to empty dict. approx_state(optional): Whether to approximate state. Defaults and must be True. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. mu0: Default parameter. mus: Array of snapshot parameters. approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList. parameterListSoft: Recognized keys of soft approximant parameters: - 'POD': whether to compute POD of snapshots; - 'scaleFactorDer': scaling factors for derivative computation; - 'nNeighbors': number of nearest neighbors; - 'radialDirectionalWeights': directional weights for computation of parameter distance. parameterListCritical: Recognized keys of critical approximant parameters: - 'S': total number of samples current approximant relies upon; - 'sampler': sample point generator. approx_state: Whether to approximate state. verbosity: Verbosity level. POD: Whether to compute POD of snapshots. scaleFactorDer: Scaling factors for derivative computation. S: Number of solution snapshots over which current approximant is based upon. sampler: Sample point generator. nNeighbors: Number of nearest neighbors. radialDirectionalWeights: Directional weights for computation of parameter distance. muBounds: list of bounds for parameter values. samplingEngine: Sampling engine. uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as sampleList. lastSolvedHF: Parameter(s) corresponding to last computed high fidelity solution(s) as parameterList. uApproxReduced: Reduced approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApproxReduced: Parameter(s) corresponding to last computed reduced approximate solution(s) as parameterList. uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApprox: Parameter(s) corresponding to last computed approximate solution(s) as parameterList. """ def __init__(self, *args, **kwargs): self._preInit() self._addParametersToList(["nNeighbors", "radialDirectionalWeights"], [1, 1.]) super().__init__(*args, **kwargs) self._postInit() @property def tModelType(self): from .trained_model.trained_model_nearest_neighbor import ( TrainedModelNearestNeighbor) return TrainedModelNearestNeighbor @property def nNeighbors(self): """Value of nNeighbors.""" return self._nNeighbors @nNeighbors.setter def nNeighbors(self, nNeighbors): self._nNeighbors = max(1, nNeighbors) self._approxParameters["nNeighbors"] = self.nNeighbors @property def radialDirectionalWeights(self): """Value of radialDirectionalWeights.""" return self._radialDirectionalWeights @radialDirectionalWeights.setter def radialDirectionalWeights(self, radialDirectionalWeights): - if hasattr(radialDirectionalWeights, "__len__"): + if isinstance(radialDirectionalWeights, Iterable): radialDirectionalWeights = list(radialDirectionalWeights) else: radialDirectionalWeights = [radialDirectionalWeights] self._radialDirectionalWeights = radialDirectionalWeights self._approxParameters["radialDirectionalWeights"] = ( self.radialDirectionalWeights) def setupApprox(self) -> int: """Compute RB projection matrix.""" if self.checkComputedApprox(): return -1 RROMPyAssert(self._mode, message = "Cannot setup approximant.") vbMng(self, "INIT", "Setting up {}.". format(self.name()), 5) self.computeSnapshots() setData = self.trainedModel is None self._setupTrainedModel(self.samplingEngine.projectionMatrix) if setData: self.trainedModel.data.NN = NNI() if self.POD: R = self.samplingEngine.RPOD if isinstance(R, (np.ndarray,)): vals, supp = list(R.T), [0] * R.shape[1] else: vals, supp = [], [] for j in range(R.shape[1]): idx = R.indices[R.indptr[j] : R.indptr[j + 1]] if len(idx) == 0: supp += [0] val = np.empty(0, dtype = R.dtype) else: supp += [idx[0]] idx = idx - idx[0] val = np.zeros(idx[-1] + 1, dtype = R.dtype) val[idx] = R.data[R.indptr[j] : R.indptr[j + 1]] vals += [val] else: vals = [np.ones(1)] * len(self.mus) supp = list(range(len(self.mus))) self.trainedModel.data.NN.setupByInterpolation(self.mus, np.arange(len(self.mus)), self.nNeighbors, self.radialDirectionalWeights) self.trainedModel.data.vals, self.trainedModel.data.supp = vals, supp self.trainedModel.data.approxParameters = copy(self.approxParameters) vbMng(self, "DEL", "Done setting up approximant.", 5) return 0 diff --git a/rrompy/reduction_methods/standard/rational_interpolant.py b/rrompy/reduction_methods/standard/rational_interpolant.py index ffa33bc..0a8fd70 100644 --- a/rrompy/reduction_methods/standard/rational_interpolant.py +++ b/rrompy/reduction_methods/standard/rational_interpolant.py @@ -1,793 +1,794 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from copy import deepcopy as copy import numpy as np from scipy.linalg import eigvals +from collections.abc import Iterable from rrompy.reduction_methods.base import checkRobustTolerance from .generic_standard_approximant import GenericStandardApproximant from rrompy.utilities.poly_fitting.polynomial import ( polybases as ppb, polyfitname, polyvander as pvP, polyTimes, polyTimesTable, vanderInvTable, PolynomialInterpolator as PI, PolynomialInterpolatorNodal as PIN) from rrompy.utilities.poly_fitting.heaviside import rational2heaviside from rrompy.utilities.poly_fitting.radial_basis import (polybases as rbpb, RadialBasisInterpolator as RBI) from rrompy.utilities.base.types import (Np1D, Np2D, Tuple, List, sampList, interpEng) from rrompy.utilities.base import verbosityManager as vbMng from rrompy.utilities.numerical import pseudoInverse, dot, potential from rrompy.utilities.numerical.hash_derivative import nextDerivativeIndices from rrompy.utilities.numerical.degree import (reduceDegreeN, degreeTotalToFull, fullDegreeMaxMask, totalDegreeMaxMask) from rrompy.solver import Np2DLikeGramian from rrompy.utilities.exception_manager import (RROMPyException, RROMPyAssert, RROMPyWarning) __all__ = ['RationalInterpolant'] class RationalInterpolant(GenericStandardApproximant): """ ROM rational interpolant computation for parametric problems. Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': whether to compute POD of snapshots; defaults to True; - 'scaleFactorDer': scaling factors for derivative computation; defaults to 'AUTO'; - 'S': total number of samples current approximant relies upon; - 'sampler': sample point generator; - 'polybasis': type of polynomial basis for interpolation; defaults to 'MONOMIAL'; - 'M': degree of rational interpolant numerator; defaults to 'AUTO', i.e. maximum allowed; - 'N': degree of rational interpolant denominator; defaults to 'AUTO', i.e. maximum allowed; - 'polydegreetype': type of polynomial degree; defaults to 'TOTAL'; - 'radialDirectionalWeights': radial basis weights for interpolant numerator; defaults to 1; - 'radialDirectionalWeightsAdapt': bounds for adaptive rescaling of radial basis weights; defaults to [-1, -1]; - 'functionalSolve': strategy for minimization of denominator functional; allowed values include 'NORM', 'DOMINANT', 'NODAL', 'LOEWNER', and 'BARYCENTRIC'; defaults to 'NORM'; - 'interpRcond': tolerance for interpolation; defaults to None; - 'robustTol': tolerance for robust rational denominator management; defaults to 0. Defaults to empty dict. approx_state(optional): Whether to approximate state. Defaults to False. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. mu0: Default parameter. mus: Array of snapshot parameters. approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList. parameterListSoft: Recognized keys of soft approximant parameters: - 'POD': whether to compute POD of snapshots; - 'scaleFactorDer': scaling factors for derivative computation; - 'polybasis': type of polynomial basis for interpolation; - 'M': degree of rational interpolant numerator; - 'N': degree of rational interpolant denominator; - 'polydegreetype': type of polynomial degree; - 'radialDirectionalWeights': radial basis weights for interpolant numerator; - 'radialDirectionalWeightsAdapt': bounds for adaptive rescaling of radial basis weights; - 'functionalSolve': strategy for minimization of denominator functional; - 'interpRcond': tolerance for interpolation via numpy.polyfit; - 'robustTol': tolerance for robust rational denominator management. parameterListCritical: Recognized keys of critical approximant parameters: - 'S': total number of samples current approximant relies upon; - 'sampler': sample point generator. approx_state: Whether to approximate state. verbosity: Verbosity level. POD: Whether to compute POD of snapshots. scaleFactorDer: Scaling factors for derivative computation. S: Number of solution snapshots over which current approximant is based upon. sampler: Sample point generator. polybasis: type of polynomial basis for interpolation. M: Numerator degree of approximant. N: Denominator degree of approximant. polydegreetype: Type of polynomial degree. radialDirectionalWeights: Radial basis weights for interpolant numerator. radialDirectionalWeightsAdapt: Bounds for adaptive rescaling of radial basis weights. functionalSolve: Strategy for minimization of denominator functional. interpRcond: Tolerance for interpolation via numpy.polyfit. robustTol: Tolerance for robust rational denominator management. muBounds: list of bounds for parameter values. samplingEngine: Sampling engine. uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as sampleList. lastSolvedHF: Parameter(s) corresponding to last computed high fidelity solution(s) as parameterList. uApproxReduced: Reduced approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApproxReduced: Parameter(s) corresponding to last computed reduced approximate solution(s) as parameterList. uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApprox: Parameter(s) corresponding to last computed approximate solution(s) as parameterList. Q: Numpy 1D vector containing complex coefficients of approximant denominator. P: Numpy 2D vector whose columns are FE dofs of coefficients of approximant numerator. """ def __init__(self, *args, **kwargs): self._preInit() self._addParametersToList(["polybasis", "M", "N", "polydegreetype", "radialDirectionalWeights", "radialDirectionalWeightsAdapt", "functionalSolve", "interpRcond", "robustTol"], ["MONOMIAL", "AUTO", "AUTO", "TOTAL", 1., [-1., -1.], "NORM", -1, 0.]) super().__init__(*args, **kwargs) self.catchInstability = 0 self._postInit() @property def tModelType(self): from .trained_model.trained_model_rational import TrainedModelRational return TrainedModelRational @property def polybasis(self): """Value of polybasis.""" return self._polybasis @polybasis.setter def polybasis(self, polybasis): try: polybasis = polybasis.upper().strip().replace(" ","") if polybasis not in ppb + rbpb: raise RROMPyException("Prescribed polybasis not recognized.") self._polybasis = polybasis except: RROMPyWarning(("Prescribed polybasis not recognized. Overriding " "to 'MONOMIAL'.")) self._polybasis = "MONOMIAL" self._approxParameters["polybasis"] = self.polybasis @property def polybasis0(self): if "_" in self.polybasis: return self.polybasis.split("_")[0] return self.polybasis @property def functionalSolve(self): """Value of functionalSolve.""" return self._functionalSolve @functionalSolve.setter def functionalSolve(self, functionalSolve): try: functionalSolve = functionalSolve.upper().strip().replace(" ","") if functionalSolve not in ["NORM", "DOMINANT", "NODAL", "LOEWNER", "BARYCENTRIC"]: raise RROMPyException(("Prescribed functionalSolve not " "recognized.")) self._functionalSolve = functionalSolve except: RROMPyWarning(("Prescribed functionalSolve not recognized. " "Overriding to 'NORM'.")) self._functionalSolve = "NORM" self._approxParameters["functionalSolve"] = self.functionalSolve @property def interpRcond(self): """Value of interpRcond.""" return self._interpRcond @interpRcond.setter def interpRcond(self, interpRcond): self._interpRcond = interpRcond self._approxParameters["interpRcond"] = self.interpRcond @property def radialDirectionalWeights(self): """Value of radialDirectionalWeights.""" return self._radialDirectionalWeights @radialDirectionalWeights.setter def radialDirectionalWeights(self, radialDirectionalWeights): - if hasattr(radialDirectionalWeights, "__len__"): + if isinstance(radialDirectionalWeights, Iterable): radialDirectionalWeights = list(radialDirectionalWeights) else: radialDirectionalWeights = [radialDirectionalWeights] self._radialDirectionalWeights = radialDirectionalWeights self._approxParameters["radialDirectionalWeights"] = ( self.radialDirectionalWeights) @property def radialDirectionalWeightsAdapt(self): """Value of radialDirectionalWeightsAdapt.""" return self._radialDirectionalWeightsAdapt @radialDirectionalWeightsAdapt.setter def radialDirectionalWeightsAdapt(self, radialDirectionalWeightsAdapt): self._radialDirectionalWeightsAdapt = radialDirectionalWeightsAdapt self._approxParameters["radialDirectionalWeightsAdapt"] = ( self.radialDirectionalWeightsAdapt) @property def M(self): """Value of M.""" return self._M @M.setter def M(self, M): if isinstance(M, str): M = M.strip().replace(" ","") if "-" not in M: M = M + "-0" self._M_isauto, self._M_shift = True, int(M.split("-")[-1]) M = 0 if M < 0: raise RROMPyException("M must be non-negative.") self._M = M self._approxParameters["M"] = self.M def _setMAuto(self): self.M = max(0, reduceDegreeN(self.S, self.S, self.npar, self.polydegreetype) - self._M_shift) vbMng(self, "MAIN", "Automatically setting M to {}.".format(self.M), 25) @property def N(self): """Value of N.""" return self._N @N.setter def N(self, N): if isinstance(N, str): N = N.strip().replace(" ","") if "-" not in N: N = N + "-0" self._N_isauto, self._N_shift = True, int(N.split("-")[-1]) N = 0 if N < 0: raise RROMPyException("N must be non-negative.") self._N = N self._approxParameters["N"] = self.N def _setNAuto(self): self.N = max(0, reduceDegreeN(self.S, self.S, self.npar, self.polydegreetype) - self._N_shift) vbMng(self, "MAIN", "Automatically setting N to {}.".format(self.N), 25) @property def polydegreetype(self): """Value of polydegreetype.""" return self._polydegreetype @polydegreetype.setter def polydegreetype(self, polydegreetype): try: polydegreetype = polydegreetype.upper().strip().replace(" ","") if polydegreetype not in ["TOTAL", "FULL"]: raise RROMPyException(("Prescribed polydegreetype not " "recognized.")) self._polydegreetype = polydegreetype except: RROMPyWarning(("Prescribed polydegreetype not recognized. " "Overriding to 'TOTAL'.")) self._polydegreetype = "TOTAL" self._approxParameters["polydegreetype"] = self.polydegreetype @property def robustTol(self): """Value of tolerance for robust rational denominator management.""" return self._robustTol @robustTol.setter def robustTol(self, robustTol): if robustTol < 0.: RROMPyWarning(("Overriding prescribed negative robustness " "tolerance to 0.")) robustTol = 0. self._robustTol = robustTol self._approxParameters["robustTol"] = self.robustTol def resetSamples(self): """Reset samples.""" super().resetSamples() self._musUniqueCN = None self._derIdxs = None self._reorder = None def _setupInterpolationIndices(self): """Setup parameters for polyvander.""" if self._musUniqueCN is None or len(self._reorder) != len(self.mus): self._musUniqueCN, musIdxsTo, musIdxs, musCount = ( self.trainedModel.centerNormalize(self.mus).unique( return_index = True, return_inverse = True, return_counts = True)) self._musUnique = self.mus[musIdxsTo] self._derIdxs = [None] * len(self._musUniqueCN) self._reorder = np.empty(len(musIdxs), dtype = int) filled = 0 for j, cnt in enumerate(musCount): self._derIdxs[j] = nextDerivativeIndices([], self.mus.shape[1], cnt) jIdx = np.nonzero(musIdxs == j)[0] self._reorder[jIdx] = np.arange(filled, filled + cnt) filled += cnt def _setupDenominator(self): """Compute rational denominator.""" RROMPyAssert(self._mode, message = "Cannot setup denominator.") vbMng(self, "INIT", "Starting computation of denominator.", 7) if hasattr(self, "_N_isauto"): self._setNAuto() else: N = reduceDegreeN(self.N, self.S, self.npar, self.polydegreetype) if N < self.N: RROMPyWarning(("N too large compared to S. Reducing N by " "{}").format(self.N - N)) self.N = N while self.N > 0: if self.functionalSolve != "NORM" and self.npar > 1: RROMPyWarning(("Strategy for functional optimization must be " "'NORM' for more than one parameter. " "Overriding to 'NORM'.")) self.functionalSolve = "NORM" if self.functionalSolve == "BARYCENTRIC" and self.N + 1 < self.S: RROMPyWarning(("Barycentric strategy cannot be applied with " "Least Squares. Overriding to 'NORM'.")) self.functionalSolve = "NORM" if self.functionalSolve == "BARYCENTRIC": invD, TN = None, None self._setupInterpolationIndices() else: invD, TN = self._computeInterpolantInverseBlocks() if (self.functionalSolve in ["NODAL", "LOEWNER", "BARYCENTRIC"] and len(self._musUnique) != len(self.mus)): if self.functionalSolve == "BARYCENTRIC": warnflag = "Barycentric" else: warnflag = "Iterative" RROMPyWarning(("{} functional optimization cannot be applied " "to repeated samples. Overriding to " "'NORM'.").format(warnflag)) self.functionalSolve = "NORM" idxSamplesEff = list(range(self.S)) if self.POD: ev, eV = self.findeveVGQR( self.samplingEngine.RPOD[:, idxSamplesEff], invD, TN) else: ev, eV = self.findeveVGExplicit( self.samplingEngine.samples(idxSamplesEff), invD, TN) if self.functionalSolve in ["NODAL", "LOEWNER"]: break nevBad = checkRobustTolerance(ev, self.robustTol) if nevBad <= (self.functionalSolve == "NORM"): break if self.catchInstability: raise RROMPyException(("Instability in denominator " "computation: eigenproblem is poorly " "conditioned."), self.catchInstability == 1) vbMng(self, "MAIN", ("Smallest {} eigenvalues below tolerance. " "Reducing N by 1.").format(nevBad), 10) self.N = self.N - 1 if self.N <= 0: self.N = 0 eV = np.ones((1, 1)) if self.N > 0 and self.functionalSolve in ["NODAL", "LOEWNER", "BARYCENTRIC"]: q = PIN() q.polybasis, q.nodes = self.polybasis0, eV else: q = PI() q.npar = self.npar q.polybasis = self.polybasis0 if self.polydegreetype == "TOTAL": q.coeffs = degreeTotalToFull(tuple([self.N + 1] * self.npar), self.npar, eV) else: q.coeffs = eV.reshape([self.N + 1] * self.npar) vbMng(self, "DEL", "Done computing denominator.", 7) return q def _setupNumerator(self): """Compute rational numerator.""" RROMPyAssert(self._mode, message = "Cannot setup numerator.") vbMng(self, "INIT", "Starting computation of numerator.", 7) self._setupInterpolationIndices() Qevaldiag = polyTimesTable(self.trainedModel.data.Q, self._musUniqueCN, self._reorder, self._derIdxs, self.scaleFactorRel) if self.POD: Qevaldiag = Qevaldiag.dot(self.samplingEngine.RPOD.T) if hasattr(self, "_M_isauto"): self._setMAuto() M = self.M else: M = reduceDegreeN(self.M, self.S, self.npar, self.polydegreetype) if M < self.M: RROMPyWarning(("M too large compared to S. Reducing M by " "{}").format(self.M - M)) self.M = M while self.M >= 0: pParRest = [self.M, self.polybasis, self.verbosity >= 5, self.polydegreetype == "TOTAL", {"derIdxs": self._derIdxs, "reorder": self._reorder, "scl": self.scaleFactorRel}] if self.polybasis in ppb: p = PI() else: self.computeScaleFactor() rDWEff = np.array([w * f for w, f in zip( self.radialDirectionalWeights, self.scaleFactor)]) pParRest = pParRest[: 2] + [rDWEff] + pParRest[2 :] pParRest[-1]["optimizeScalingBounds"] = ( self.radialDirectionalWeightsAdapt) p = RBI() if self.polybasis in ppb + rbpb: pParRest += [{"rcond": self.interpRcond}] wellCond, msg = p.setupByInterpolation(self._musUniqueCN, Qevaldiag, *pParRest) vbMng(self, "MAIN", msg, 5) if wellCond: break if self.catchInstability: raise RROMPyException(("Instability in numerator computation: " "polyfit is poorly conditioned."), self.catchInstability == 1) vbMng(self, "MAIN", ("Polyfit is poorly conditioned. Reducing M " "by 1."), 10) self.M = self.M - 1 if self.M < 0: raise RROMPyException(("Instability in computation of numerator. " "Aborting.")) self.M = M vbMng(self, "DEL", "Done computing numerator.", 7) return p def setupApprox(self) -> int: """Compute rational interpolant.""" if self.checkComputedApprox(): return -1 RROMPyAssert(self._mode, message = "Cannot setup approximant.") vbMng(self, "INIT", "Setting up {}.". format(self.name()), 5) self.computeSnapshots() self._setupTrainedModel(self.samplingEngine.projectionMatrix) self._setupRational(self._setupDenominator()) self.trainedModel.data.approxParameters = copy(self.approxParameters) vbMng(self, "DEL", "Done setting up approximant.", 5) return 0 def _setupRational(self, Q:interpEng, P : interpEng = None): vbMng(self, "INIT", "Starting approximant finalization.", 5) self.trainedModel.data.Q = Q if P is None: P = self._setupNumerator() if self.N > 0 and self.npar == 1: pls = Q.roots() idxBad = self.HFEngine.flagBadPolesResidues(pls, relative = True) plsN = self.mapParameterList(self.mapParameterList(self.mu0)(0, 0) + self.scaleFactor * pls, "B")(0) idxBad = np.logical_or(self.HFEngine.flagBadPolesResidues(pls, relative = True), self.HFEngine.flagBadPolesResidues(plsN)) if np.any(idxBad): vbMng(self, "MAIN", "Removing {} spurious poles out of {} due to poles."\ .format(np.sum(idxBad), self.N), 10) if isinstance(Q, PIN): Q.nodes = Q.nodes[np.logical_not(idxBad)] else: Q = PI() Q.npar = self.npar Q.polybasis = self.polybasis0 Q.coeffs = np.ones(1, dtype = np.complex) for pl in pls[np.logical_not(idxBad)]: Q.coeffs = polyTimes(Q.coeffs, [- pl, 1.], Pbasis = Q.polybasis, Rbasis = Q.polybasis) Q.coeffs /= np.linalg.norm(Q.coeffs) self.trainedModel.data.Q = Q self.N = Q.deg[0] P = self._setupNumerator() if (not hasattr(self.HFEngine, "_ignoreResidues") or not self.HFEngine._ignoreResidues): cfs, pls, _ = rational2heaviside(P, Q) cfs = cfs[: self.N].T if not self.POD: cfs = self.samplingEngine.projectionMatrix.dot(cfs) foci = self.sampler.normalFoci() ground = self.sampler.groundPotential() potEff = potential(pls, foci) / ground potEff[np.logical_or(potEff < 1., np.isinf(pls))] = 1. cfs[:, np.isinf(pls)] = 0. cfs /= potEff # rescale by potential idxBad = self.HFEngine.flagBadPolesResidues(pls, cfs) if np.any(idxBad): vbMng(self, "MAIN", ("Removing {} spurious poles out of {} due to " "residues.").format(np.sum(idxBad), self.N), 10) if isinstance(Q, PIN): Q.nodes = Q.nodes[np.logical_not(idxBad)] else: Q = PI() Q.npar = self.npar Q.polybasis = self.polybasis0 Q.coeffs = np.ones(1, dtype = np.complex) for pl in pls[np.logical_not(idxBad)]: Q.coeffs = polyTimes(Q.coeffs, [- pl, 1.], Pbasis = Q.polybasis, Rbasis = Q.polybasis) Q.coeffs /= np.linalg.norm(Q.coeffs) self.trainedModel.data.Q = Q self.N = Q.deg[0] P = self._setupNumerator() self.trainedModel.data.P = P vbMng(self, "DEL", "Terminated approximant finalization.", 5) def _computeInterpolantInverseBlocks(self) -> Tuple[List[Np2D], Np2D]: """ Compute inverse factors for minimal interpolant target functional. """ RROMPyAssert(self._mode, message = "Cannot solve eigenvalue problem.") self._setupInterpolationIndices() pvPPar = [self.polybasis0, self._derIdxs, self._reorder, self.scaleFactorRel] if hasattr(self, "_M_isauto"): self._setMAuto() E = max(self.M, self.N) while E >= 0: if self.polydegreetype == "TOTAL": Eeff = E idxsB = totalDegreeMaxMask(E, self.npar) else: #if self.polydegreetype == "FULL": Eeff = [E] * self.npar idxsB = fullDegreeMaxMask(E, self.npar) TE = pvP(self._musUniqueCN, Eeff, *pvPPar) fitOut = pseudoInverse(TE, rcond = self.interpRcond, full = True) vbMng(self, "MAIN", ("Fitting {} samples with degree {} through {}... " "Conditioning of pseudoinverse system: {:.4e}.").format( TE.shape[0], E, polyfitname(self.polybasis0), fitOut[1][1][0] / fitOut[1][1][-1]), 5) if fitOut[1][0] == TE.shape[1]: fitinv = fitOut[0][idxsB, :] break if self.catchInstability: raise RROMPyException(("Instability in denominator " "computation: polyfit is poorly " "conditioned."), self.catchInstability == 1) EeqN = E == self.N vbMng(self, "MAIN", ("Polyfit is poorly conditioned. Reducing E {}" "by 1.").format("and N " * EeqN), 10) if EeqN: self.N = self.N - 1 E -= 1 if self.N < 0: raise RROMPyException(("Instability in computation of " "denominator. Aborting.")) invD = vanderInvTable(fitinv, idxsB, self._reorder, self._derIdxs) if self.N == E: TN = TE else: if self.polydegreetype == "TOTAL": Neff = self.N idxsB = totalDegreeMaxMask(self.N, self.npar) else: #if self.polydegreetype == "FULL": Neff = [self.N] * self.npar idxsB = fullDegreeMaxMask(self.N, self.npar) TN = pvP(self._musUniqueCN, Neff, *pvPPar) return invD, TN def findeveVGExplicit(self, sampleE:sampList, invD:List[Np2D], TN:Np2D) -> Tuple[Np1D, Np2D]: """ Compute explicitly eigenvalues and eigenvectors of rational denominator matrix. """ RROMPyAssert(self._mode, message = "Cannot solve eigenvalue problem.") vbMng(self, "INIT", "Building gramian matrix.", 10) gramian = self.HFEngine.innerProduct(sampleE, sampleE, is_state = self.approx_state) if self.functionalSolve == "NODAL": SEnd = invD[0].shape[1] G0 = np.zeros((SEnd,) * 2, dtype = np.complex) elif self.functionalSolve == "LOEWNER": G0 = gramian if self.functionalSolve == "BARYCENTRIC": nEnd = len(gramian) - 1 else: nEnd = TN.shape[1] G = np.zeros((nEnd, nEnd), dtype = np.complex) for k in range(len(invD)): iDkN = dot(invD[k], TN) G += dot(dot(gramian, iDkN).T, iDkN.conj()).T if self.functionalSolve == "NODAL": G0 += dot(dot(gramian, invD[k]).T, invD[k].conj()).T vbMng(self, "DEL", "Done building gramian.", 10) if self.functionalSolve == "NORM": ev, eV = np.linalg.eigh(G) eV = eV[:, 0] problem = "eigenproblem" else: if self.functionalSolve == "BARYCENTRIC": fitOut = pseudoInverse(gramian, rcond = self.interpRcond, full = True) barWeigths = fitOut[0].dot(np.ones(nEnd + 1)) eV = self.findeVBarycentric(barWeigths / np.sum(barWeigths)) else: fitOut = pseudoInverse(G[:-1, :-1], rcond = self.interpRcond, full = True) eV = np.append(fitOut[0].dot(G[:-1, -1]), -1.) ev = fitOut[1][1][::-1] problem = "linear problem" vbMng(self, "MAIN", ("Solved {} of size {} with condition number " "{:.4e}.").format(problem, nEnd, ev[-1] / ev[0]), 5) if self.functionalSolve in ["NODAL", "LOEWNER"]: q = PI() q.npar, q.polybasis, q.coeffs = self.npar, self.polybasis0, eV eV, tol, niter, passed = self.findeVNewton(q.roots(), G0) if passed: vbMng(self, "MAIN", ("Newton algorithm for problem of size {} converged " "(tol = {:.4e}) in {} iterations.").format(nEnd, tol, niter), 5) else: RROMPyWarning(("Newton algorithm for problem of size {} did " "not converge (tol = {:.4e}) after {} " "iterations.").format(nEnd, tol, niter)) return ev, eV def findeveVGQR(self, RPODE:Np2D, invD:List[Np2D], TN:Np2D) -> Tuple[Np1D, Np2D]: """ Compute eigenvalues and eigenvectors of rational denominator matrix through SVD of R factor. """ RROMPyAssert(self._mode, message = "Cannot solve eigenvalue problem.") vbMng(self, "INIT", "Building half-gramian matrix stack.", 10) if self.functionalSolve == "NODAL": gramian = Np2DLikeGramian(None, dot(RPODE, invD[0])) elif self.functionalSolve == "LOEWNER": gramian = Np2DLikeGramian(None, RPODE) if self.functionalSolve == "BARYCENTRIC": nEnd = RPODE.shape[1] - 1 else: S, nEnd, eWidth = RPODE.shape[0], TN.shape[1], len(invD) Rstack = np.zeros((S * eWidth, nEnd), dtype = np.complex) for k in range(eWidth): Rstack[k * S : (k + 1) * S, :] = dot(RPODE, dot(invD[k], TN)) vbMng(self, "DEL", "Done building half-gramian.", 10) if self.functionalSolve in ["NORM", "BARYCENTRIC"]: if self.functionalSolve == "NORM": _, s, Vh = np.linalg.svd(Rstack, full_matrices = False) eV = Vh[-1, :].conj() else: #if self.functionalSolve == "BARYCENTRIC": _, s, Vh = np.linalg.svd(RPODE, full_matrices = False) s[np.logical_not(np.isclose(s, 0.))] **= -2. barWeigths = (Vh.T.conj() * s).dot(Vh.dot(np.ones(nEnd + 1))) eV = self.findeVBarycentric(barWeigths / np.sum(barWeigths)) ev = s[::-1] problem = "svd problem" else: fitOut = pseudoInverse(Rstack[:, :-1], rcond = self.interpRcond, full = True) ev = fitOut[1][1][::-1] eV = np.append(fitOut[0].dot(Rstack[:, -1]), -1.) problem = "linear problem" vbMng(self, "MAIN", ("Solved {} of size {} with condition number " "{:.4e}.").format(problem, nEnd, ev[-1] / ev[0]), 5) if self.functionalSolve in ["NODAL", "LOEWNER"]: q = PI() q.npar, q.polybasis, q.coeffs = self.npar, self.polybasis0, eV eV, tol, niter, passed = self.findeVNewton(q.roots(), gramian) if passed: vbMng(self, "MAIN", ("Newton algorithm for problem of size {} converged " "(tol = {:.4e}) in {} iterations.").format(nEnd, tol, niter), 5) else: RROMPyWarning(("Newton algorithm for problem of size {} did " "not converge (tol = {:.2e}) after {} " "iterations.").format(nEnd, tol, niter)) return ev, eV def findeVBarycentric(self, baryWeights:Np1D) -> Np1D: RROMPyAssert(self._mode, message = "Cannot solve optimization problem.") arrow = np.pad(np.diag(self._musUniqueCN.data[ self._reorder].flatten()), (1, 0), "constant", constant_values = 1.) + 0.j arrow[0, 0] = 0. arrow[0, 1:] = baryWeights active = np.pad(np.eye(len(baryWeights)), (1, 0), "constant") eV = eigvals(arrow, active) return eV[np.logical_not(np.isinf(eV))] def findeVNewton(self, nodes0:Np1D, gram:Np2D, maxiter : int = 25, tolNewton : float = 1e-10) \ -> Tuple[Np1D, float, int, bool]: RROMPyAssert(self._mode, message = "Cannot solve optimization problem.") algo = self.functionalSolve N = len(nodes0) nodes = nodes0 grad = np.zeros(N, dtype = np.complex) hess = np.zeros((N, N), dtype = np.complex) mu = np.repeat(self._musUniqueCN.data[self._reorder], N, axis = 1) for niter in range(maxiter): if algo == "NODAL": plDist = mu - nodes.reshape(1, -1) q0, q = np.prod(plDist, axis = 1), [] elif algo == "LOEWNER": loew = np.pad(np.power(mu - nodes.reshape(1, -1), -1.), [(0, 0), (1, 0)], 'constant', constant_values = 1.) loewI = pseudoInverse(loew) Ids = [] for jS in range(N): if algo == "NODAL": mask = np.arange(N) != jS q += [np.prod(plDist[:, mask], axis = 1)] grad[jS] = q[-1].conj().dot(gram.dot(q0)) elif algo == "LOEWNER": Ids += [loewI.dot(np.power(loew[:, 1 + jS], 2.))] zIj, jI = Ids[-1][0], loewI[1 + jS] grad[jS] = (zIj * jI).conj().dot(gram.dot(loewI[0])) for iS in range(jS + 1): if algo == "NODAL": if iS == jS: hij = 0. sij = q[-1].conj().dot(gram.dot(q[-1])) else: mask = np.logical_and(np.arange(N) != iS, np.arange(N) != jS) qij = np.prod(plDist[:, mask], axis = 1) hij = qij.conj().dot(gram.dot(q0)) sij = q[-1].conj().dot(gram.dot(q[iS])) elif algo == "LOEWNER": zIi, iIj = Ids[iS][0], Ids[-1][1 + iS] hij = (zIi * iIj * jI).conj().dot(gram.dot(loewI[0])) if iS == jS: iI = jI zIdd = loewI[0].dot(np.power(loew[:, 1 + jS], 3.)) hij += (zIdd * jI).conj().dot(gram.dot(loewI[0])) hij *= 2. else: jIi, iI = Ids[iS][1 + jS], loewI[1 + iS] hij += (zIj * jIi * iI).conj().dot( gram.dot(loewI[0])) sij = (zIj * jI).conj().dot(gram.dot(zIi * iI)) hess[jS, iS] = hij + sij if iS < jS: hess[iS, jS] = hij + sij.conj() dnodes = np.linalg.solve(hess, grad) nodes += dnodes tol = np.linalg.norm(dnodes) / np.linalg.norm(nodes) if tol < tolNewton: break return nodes, tol, niter, niter < maxiter def getResidues(self, *args, **kwargs) -> Np2D: """ Obtain approximant residues. Returns: Matrix with residues as columns. """ return self.trainedModel.getResidues(*args, **kwargs) diff --git a/rrompy/reduction_methods/standard/trained_model/trained_model_rational.py b/rrompy/reduction_methods/standard/trained_model/trained_model_rational.py index b5c093b..a68fd29 100644 --- a/rrompy/reduction_methods/standard/trained_model/trained_model_rational.py +++ b/rrompy/reduction_methods/standard/trained_model/trained_model_rational.py @@ -1,187 +1,188 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import numpy as np +from collections.abc import Iterable from rrompy.reduction_methods.base.trained_model.trained_model import ( TrainedModel) from rrompy.utilities.numerical import dot from rrompy.utilities.numerical.compress_matrix import compressMatrix from rrompy.utilities.base.types import (Np1D, Np2D, List, paramVal, paramList, sampList) from rrompy.utilities.base import verbosityManager as vbMng, freepar as fp from rrompy.utilities.exception_manager import RROMPyException, RROMPyWarning from rrompy.parameter import emptyParameterList from rrompy.sampling import sampleList __all__ = ['TrainedModelRational'] class TrainedModelRational(TrainedModel): """ ROM approximant evaluation for rational approximant. Attributes: Data: dictionary with all that can be pickled. """ def compress(self, collapse : bool = False, tol : float = 0., *args, **kwargs): if not collapse and tol <= 0.: return RMat = self.data.projMat if not collapse: if hasattr(self.data, "_compressTol"): RROMPyWarning(("Recompressing already compressed model is " "ineffective. Aborting.")) return self.data.projMat, RMat, _ = compressMatrix(RMat, tol, *args, **kwargs) self.data.P.postmultiplyTensorize(RMat.T) super().compress(collapse, tol) def centerNormalize(self, mu : paramList = [], mu0 : paramVal = None) -> paramList: """ Compute normalized parameter to be plugged into approximant. Args: mu: Parameter(s) 1. mu0: Parameter(s) 2. If None, set to self.data.mu0. Returns: Normalized parameter. """ mu = self.checkParameterList(mu) if mu0 is None: mu0 = self.data.mu0 return (self.mapParameterList(mu) - self.mapParameterList(mu0)) / self.data.scaleFactor def getPVal(self, mu : paramList = []) -> sampList: """ Evaluate rational numerator at arbitrary parameter. Args: mu: Target parameter. """ mu = self.checkParameterList(mu) vbMng(self, "INIT", "Evaluating numerator at mu = {}.".format(mu), 17) p = sampleList(self.data.P(self.centerNormalize(mu))) vbMng(self, "DEL", "Done evaluating numerator.", 17) return p def getQVal(self, mu:Np1D, der : List[int] = None, scl : Np1D = None) -> Np1D: """ Evaluate rational denominator at arbitrary parameter. Args: mu: Target parameter. der(optional): Derivatives to take before evaluation. """ mu = self.checkParameterList(mu) vbMng(self, "INIT", "Evaluating denominator at mu = {}.".format(mu), 17) q = self.data.Q(self.centerNormalize(mu), der, scl) vbMng(self, "DEL", "Done evaluating denominator.", 17) return q def getApproxReduced(self, mu : paramList = []) -> sampList: """ Evaluate reduced representation of approximant at arbitrary parameter. Args: mu: Target parameter. """ mu = self.checkParameterList(mu) if (not hasattr(self, "lastSolvedApproxReduced") or self.lastSolvedApproxReduced != mu): vbMng(self, "INIT", "Evaluating approximant at mu = {}.".format(mu), 12) QV = self.getQVal(mu) QVzero = np.where(QV == 0.)[0] if len(QVzero) > 0: QV[QVzero] = np.finfo(np.complex).eps / (1. + self.data.Q.deg[0]) self.uApproxReduced = self.getPVal(mu) / QV vbMng(self, "DEL", "Done evaluating approximant.", 12) self.lastSolvedApproxReduced = mu return self.uApproxReduced def getPoles(self, *args, **kwargs) -> Np1D: """ Obtain approximant poles. Returns: Numpy complex vector of poles. """ if len(args) + len(kwargs) > 1: raise RROMPyException(("Wrong number of parameters passed. " "Only 1 available.")) elif len(args) + len(kwargs) == 1: if len(args) == 1: mVals = args[0] else: mVals = kwargs["marginalVals"] - if not hasattr(mVals, "__len__"): mVals = [mVals] + if not isinstance(mVals, Iterable): mVals = [mVals] mVals = list(mVals) else: mVals = [fp] rDim = mVals.index(fp) if rDim < len(mVals) - 1 and fp in mVals[rDim + 1 :]: raise RROMPyException(("Exactly 1 'freepar' entry in " "marginalVals must be provided.")) mVals[rDim] = self.data.mu0(rDim) mVals = list(self.centerNormalize(mVals).data.flatten()) mVals[rDim] = fp roots = self.data.scaleFactor[rDim] * self.data.Q.roots(mVals) return self.mapParameterList(self.mapParameterList(self.data.mu0(rDim), idx = [rDim])(0, 0) + roots, "B", [rDim])(0) def getResidues(self, *args, **kwargs) -> Np2D: """ Obtain approximant residues. Returns: Numpy matrix with residues as columns. """ pls = self.getPoles(*args, **kwargs) if len(args) == 1: mVals = args[0] elif len(args) == 0: mVals = [None] else: mVals = kwargs["marginalVals"] - if not hasattr(mVals, "__len__"): mVals = [mVals] + if not isinstance(mVals, Iterable): mVals = [mVals] mVals = list(mVals) rDim = mVals.index(fp) poles = emptyParameterList() poles.reset((len(pls), self.data.npar), dtype = pls.dtype) for k, pl in enumerate(pls): mValsLoc = list(mVals) mValsLoc[rDim] = pl poles[k] = mValsLoc QV = self.getQVal(poles, list(1 * (np.arange(self.data.npar) == rDim))) QVzero = np.where(QV == 0.)[0] if len(QVzero) > 0: RROMPyWarning(("Adjusting residuals to avoid division by " "numerically zero denominator.")) QV[QVzero] = np.finfo(np.complex).eps / (1. + self.data.Q.deg[0]) Res = self.getPVal(poles) if not self.data._collapsed: Res = sampleList(dot(self.data.projMat[:, : Res.shape[0]], Res)) res = Res / QV return pls, res.T diff --git a/rrompy/reduction_methods/standard/trained_model/trained_model_reduced_basis.py b/rrompy/reduction_methods/standard/trained_model/trained_model_reduced_basis.py index 6b5883e..4e2bfb4 100644 --- a/rrompy/reduction_methods/standard/trained_model/trained_model_reduced_basis.py +++ b/rrompy/reduction_methods/standard/trained_model/trained_model_reduced_basis.py @@ -1,151 +1,153 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import numpy as np +from collections.abc import Iterable from rrompy.reduction_methods.base.trained_model.trained_model import ( TrainedModel) from rrompy.reduction_methods.base.reduced_basis_utils import ( projectAffineDecomposition) from rrompy.utilities.base.types import (Np1D, ListAny, paramVal, paramList, sampList) from rrompy.utilities.base import verbosityManager as vbMng, freepar as fp from rrompy.utilities.numerical.compress_matrix import compressMatrix from rrompy.utilities.numerical.marginalize_poly_list import ( marginalizePolyList) from rrompy.utilities.numerical.nonlinear_eigenproblem import ( eigvalsNonlinearDense) from rrompy.utilities.expression import expressionEvaluator from rrompy.utilities.exception_manager import RROMPyException, RROMPyWarning from rrompy.parameter import checkParameter from rrompy.sampling import sampleList from rrompy.utilities.parallel import (poolRank, masterCore, listScatter, matrixGatherv, isend, recv) __all__ = ['TrainedModelReducedBasis'] class TrainedModelReducedBasis(TrainedModel): """ ROM approximant evaluation for RB approximant. Attributes: Data: dictionary with all that can be pickled. """ def reset(self): super().reset() if hasattr(self, "data") and hasattr(self.data, "lastSetupMu"): self.data.lastSetupMu = None def compress(self, collapse : bool = False, tol : float = 0., *args, **kwargs): if collapse: raise RROMPyException("Cannot collapse implicit surrogates.") if tol <= 0.: return if hasattr(self.data, "_compressTol"): RROMPyWarning(("Recompressing already compressed model is " "ineffective. Aborting.")) return self.data.projMat, RMat, _ = compressMatrix(self.data.projMat, tol, *args, **kwargs) self.data.ARBs, self.data.bRBs = projectAffineDecomposition( self.data.ARBs, self.data.bRBs, RMat) super().compress(collapse, tol) def assembleReducedModel(self, mu:paramVal): mu = checkParameter(mu, self.data.npar) if not (hasattr(self.data, "lastSetupMu") and self.data.lastSetupMu == mu): vbMng(self, "INIT", "Assembling reduced model for mu = {}."\ .format(mu), 17) muEff = self.mapParameterList(mu) self.data.ARBmu, self.data.bRBmu = 0., 0. for thA, ARB in zip(self.data.thAs, self.data.ARBs): self.data.ARBmu = (expressionEvaluator(thA[0], muEff) * ARB + self.data.ARBmu) for thb, bRB in zip(self.data.thbs, self.data.bRBs): self.data.bRBmu = (expressionEvaluator(thb[0], muEff) * bRB + self.data.bRBmu) vbMng(self, "DEL", "Done assembling reduced model.", 17) self.data.lastSetupMu = mu def getApproxReduced(self, mu : paramList = []) -> sampList: """ Evaluate reduced representation of approximant at arbitrary parameter. Args: mu: Target parameter. """ mu = self.checkParameterList(mu) if (not hasattr(self, "lastSolvedApproxReduced") or self.lastSolvedApproxReduced != mu): vbMng(self, "INIT", "Computing RB solution at mu = {}.".format(mu), 12) mu, _, sizes = listScatter(mu, return_sizes = True) mu = self.checkParameterList(mu) req, emptyCores = [], np.where(np.logical_not(sizes))[0] if len(mu) == 0: vbMng(self, "MAIN", "Idling.", 37) uL, uT = recv(source = 0, tag = poolRank()) uApproxR = np.empty((uL, 0), dtype = uT) else: for j, mj in enumerate(mu): self.assembleReducedModel(mj) uAppR = np.linalg.solve(self.data.ARBmu, self.data.bRBmu) if j == 0: uApproxR = np.empty((len(uAppR), len(mu)), dtype = uAppR.dtype) if masterCore(): for dest in emptyCores: req += [isend((len(uAppR), uAppR.dtype), dest = dest, tag = dest)] uApproxR[:, j] = uAppR for r in req: r.wait() uApproxR = matrixGatherv(uApproxR, sizes) self.uApproxReduced = sampleList(uApproxR) vbMng(self, "DEL", "Done computing RB solution.", 12) self.lastSolvedApproxReduced = mu return self.uApproxReduced def getPoles(self, marginalVals : ListAny = [fp], jSupp : int = 1, **kwargs) -> Np1D: """ Obtain approximant poles. Returns: Numpy complex vector of poles. """ if not self.data.affinePoly: RROMPyWarning(("Unable to compute approximate poles due " "to parametric dependence (detected non-" "polynomial). Change HFEngine.affinePoly to True " "if necessary.")) return - if not hasattr(marginalVals, "__len__"): marginalVals = [marginalVals] + if not isinstance(marginalVals, Iterable): + marginalVals = [marginalVals] mVals = list(marginalVals) rDim = mVals.index(fp) if rDim < len(mVals) - 1 and fp in mVals[rDim + 1 :]: raise RROMPyException(("Exactly 1 'freepar' entry in " "marginalVals must be provided.")) ARBs = self.data.ARBs if self.data.npar > 1: mVals[rDim] = self.data.mu0(rDim) mVals = checkParameter(mVals, return_data = True).flatten() mVals[rDim] = fp ARBs = marginalizePolyList(ARBs, mVals, "auto") ev = eigvalsNonlinearDense(ARBs, jSupp = jSupp, **kwargs) return self.mapParameterList(ev, "B", [rDim])(0) diff --git a/rrompy/sampling/engines/sampling_engine_standard.py b/rrompy/sampling/engines/sampling_engine_standard.py index b663ba4..ce72522 100644 --- a/rrompy/sampling/engines/sampling_engine_standard.py +++ b/rrompy/sampling/engines/sampling_engine_standard.py @@ -1,358 +1,359 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from numbers import Number from copy import deepcopy as copy import numpy as np +from collections.abc import Iterable from warnings import catch_warnings from rrompy.utilities.base.types import (Np1D, Np2D, HFEng, List, paramVal, Any, paramList, sampList, Tuple, TupleAny, DictAny, FigHandle) from rrompy.utilities.base.data_structures import getNewFilename from rrompy.utilities.base import verbosityManager as vbMng from rrompy.utilities.exception_manager import (RROMPyWarning, RROMPyException, RROMPyAssert, RROMPy_READY, RROMPy_FRAGILE) from rrompy.utilities.numerical.hash_derivative import nextDerivativeIndices from rrompy.utilities.base.pickle_utilities import pickleDump, pickleLoad from rrompy.parameter import (emptyParameterList, checkParameter, checkParameterList) from rrompy.sampling import sampleList, emptySampleList from rrompy.utilities.parallel import bcast, masterCore __all__ = ['SamplingEngineStandard'] class SamplingEngineStandard: def __init__(self, HFEngine:HFEng, sample_state : bool = False, verbosity : int = 10, timestamp : bool = True, scaleFactor : Np1D = None): self.sample_state = sample_state self.verbosity = verbosity self.timestamp = timestamp vbMng(self, "INIT", "Initializing sampling engine of type {}.".format(self.name()), 10) self.HFEngine = HFEngine vbMng(self, "DEL", "Done initializing sampling engine.", 10) self.scaleFactor = scaleFactor def name(self) -> str: return self.__class__.__name__ def __str__(self) -> str: return self.name() def __repr__(self) -> str: return self.__str__() + " at " + hex(id(self)) @property def HFEngine(self): """Value of HFEngine. Its assignment resets history.""" return self._HFEngine @HFEngine.setter def HFEngine(self, HFEngine): self._HFEngine = HFEngine self.resetHistory() @property def scaleFactor(self): """Value of scaleFactor.""" return self._scaleFactor @scaleFactor.setter def scaleFactor(self, scaleFactor): if scaleFactor is None: scaleFactor = 1. - if not hasattr(scaleFactor, "__len__"): scaleFactor = [scaleFactor] + if not isinstance(scaleFactor, Iterable): scaleFactor = [scaleFactor] self._scaleFactor = scaleFactor def scaleDer(self, derIdx:Np1D): if not isinstance(self.scaleFactor, Number): RROMPyAssert(len(derIdx), len(self.scaleFactor), "Number of derivative indices") with catch_warnings(record = True) as w: res = np.prod(np.power(self.scaleFactor, derIdx)) if len(w) == 0: return res raise RROMPyException(("Error in computing derivative scaling " "factor: {}".format(str(w[-1].message)))) @property def feature_keys(self) -> TupleAny: return ["mus", "samples", "nsamples", "_derIdxs"] @property def feature_vals(self) -> DictAny: return {"mus":self.mus, "samples":self.samples, "nsamples":self.nsamples, "_derIdxs":self._derIdxs, "_scaleFactor":self.scaleFactor} def _mergeFeature(self, name:str, value:Any): if name in ["mus", "samples"]: getattr(self, name).append(value) elif name == "nsamples": self.nsamples += value elif name == "_derIdxs": self._derIdxs += value else: raise RROMPyException(("Invalid key {} in sampling engine " "merge.".format(name))) def store(self, filenameBase : str = "sampling_engine", forceNewFile : bool = True, local : bool = False) -> str: """Store sampling engine to file.""" filename = None if masterCore(): vbMng(self, "INIT", "Storing sampling engine to file.", 20) if forceNewFile: filename = getNewFilename(filenameBase, "pkl") else: filename = "{}.pkl".format(filenameBase) pickleDump(self.feature_vals, filename) vbMng(self, "DEL", "Done storing engine.", 20) if local: return filename = bcast(filename) return filename def load(self, filename:str, merge : bool = False): """Load sampling engine from file.""" if isinstance(filename, (list, tuple,)): self.load(filename[0], merge) for filen in filename[1 :]: self.load(filen, True) return vbMng(self, "INIT", "Loading stored sampling engine from file.", 20) datadict = pickleLoad(filename) for key in datadict: if key in self.feature_keys: if merge and key != "_scaleFactor": self._mergeFeature(key, datadict[key]) else: setattr(self, key, datadict[key]) self._mode = RROMPy_FRAGILE vbMng(self, "DEL", "Done loading stored engine.", 20) @property def projectionMatrix(self) -> Np2D: return self.samples.data def resetHistory(self): self._mode = RROMPy_READY self.samples = emptySampleList() self.nsamples = 0 self.mus = emptyParameterList() self._derIdxs = [] def setsample(self, u:sampList, overwrite : bool = False): if overwrite: self.samples[self.nsamples] = u else: if self.nsamples == 0: self.samples = sampleList(u) else: self.samples.append(u) def popSample(self): if hasattr(self, "nsamples") and self.nsamples > 1: if self.samples.shape[1] > self.nsamples: RROMPyWarning(("More than 'nsamples' memory allocated for " "samples. Popping empty sample column.")) self.nsamples += 1 self.nsamples -= 1 self.samples.pop() self.mus.pop() else: self.resetHistory() def preallocateSamples(self, u:sampList, mu:paramVal, n:int): self._mode = RROMPy_READY self.samples.reset((u.shape[0], n), u.dtype) self.samples[0] = u mu = checkParameter(mu, self.HFEngine.npar) self.mus.reset((n, self.HFEngine.npar)) self.mus[0] = mu[0] def postprocessu(self, u:sampList, overwrite : bool = False): self.setsample(u, overwrite) def postprocessuBulk(self): pass def solveLS(self, mu : paramList = [], RHS : sampList = None) -> sampList: """ Solve linear system. Args: mu: Parameter value. Returns: Solution of system. """ mu = checkParameterList(mu, self.HFEngine.npar) vbMng(self, "INIT", "Solving HF model for mu = {}.".format(mu), 15) u = self.HFEngine.solve(mu, RHS, return_state = self.sample_state) vbMng(self, "DEL", "Done solving HF model.", 15) return u def _getSampleConcurrence(self, mu:paramVal, previous:Np1D) -> sampList: RROMPyAssert(self._mode, message = "Cannot add samples.") if not (self.sample_state or self.HFEngine.isCEye): raise RROMPyException(("Derivatives of solution with non-scalar " "C not computable.")) from rrompy.utilities.numerical import dot if len(previous) >= len(self._derIdxs): self._derIdxs += nextDerivativeIndices(self._derIdxs, len(self.scaleFactor), len(previous) + 1 - len(self._derIdxs)) derIdx = self._derIdxs[len(previous)] mu = checkParameter(mu, self.HFEngine.npar) samplesOld = self.samples(previous) RHS = self.scaleDer(derIdx) * self.HFEngine.b(mu, derIdx) for j, derP in enumerate(self._derIdxs[: len(previous)]): diffP = [x - y for (x, y) in zip(derIdx, derP)] if np.all([x >= 0 for x in diffP]): RHS -= self.scaleDer(diffP) * dot(self.HFEngine.A(mu, diffP), samplesOld[j]) return self.solveLS(mu, RHS = RHS) def nextSample(self, mu:paramVal, overwrite : bool = False, postprocess : bool = True) -> Np1D: RROMPyAssert(self._mode, message = "Cannot add samples.") mu = checkParameter(mu, self.HFEngine.npar) muidxs = self.mus.findall(mu[0]) if len(muidxs) > 0: u = self._getSampleConcurrence(mu, np.sort(muidxs)) else: u = self.solveLS(mu) if postprocess: self.postprocessu(u, overwrite = overwrite) else: self.setsample(u, overwrite) if overwrite: self.mus[self.nsamples] = mu[0] else: self.mus.append(mu) self.nsamples += 1 return self.samples[self.nsamples - 1] def iterSample(self, mus:paramList) -> sampList: mus = checkParameterList(mus, self.HFEngine.npar) vbMng(self, "INIT", "Starting sampling iterations.", 5) n = len(mus) if n <= 0: raise RROMPyException(("Number of samples must be positive.")) self.resetHistory() if len(mus.unique()) != n: for j in range(n): vbMng(self, "MAIN", "Computing sample {} / {}.".format(j + 1, n), 7) self.nextSample(mus[j], overwrite = (j > 0), postprocess = False) if n > 1 and j == 0: self.preallocateSamples(self.samples[0], mus[0], n) else: self.setsample(self.solveLS(mus), overwrite = False) self.mus = copy(mus) self.nsamples = n self.postprocessuBulk() vbMng(self, "DEL", "Finished sampling iterations.", 5) return self.samples def plotSamples(self, warpings : List[List[callable]] = None, name : str = "u", **kwargs) -> Tuple[List[FigHandle], List[str]]: """ Do some nice plots of the samples. Args: warpings(optional): Domain warping functions. name(optional): Name to be shown as title of the plots. Defaults to 'u'. Returns: Output filenames and figure handles. """ if warpings is None: warpings = [None] * self.nsamples figs = [None] * self.nsamples filesOut = [None] * self.nsamples for j in range(self.nsamples): pltOut = self.HFEngine.plot(self.samples[j], warpings[j], self.sample_state, "{}_{}".format(name, j), **kwargs) if isinstance(pltOut, (tuple,)): figs[j], filesOut[j] = pltOut else: figs[j] = pltOut if filesOut[0] is None: return figs return figs, filesOut def outParaviewSamples(self, warpings : List[List[callable]] = None, name : str = "u", filename : str = "out", times : Np1D = None, **kwargs) -> List[str]: """ Output samples to ParaView file. Args: warpings(optional): Domain warping functions. name(optional): Base name to be used for data output. filename(optional): Name of output file. times(optional): Timestamps. Returns: Output filenames. """ if warpings is None: warpings = [None] * self.nsamples if times is None: times = [0.] * self.nsamples filesOut = [None] * self.nsamples for j in range(self.nsamples): filesOut[j] = self.HFEngine.outParaview( self.samples[j], warpings[j], self.sample_state, "{}_{}".format(name, j), "{}_{}".format(filename, j), times[j], **kwargs) if filesOut[0] is None: return None return filesOut def outParaviewTimeDomainSamples(self, omegas : Np1D = None, warpings : List[List[callable]] = None, timeFinal : Np1D = None, periodResolution : List[int] = 20, name : str = "u", filename : str = "out", **kwargs) -> List[str]: """ Output samples to ParaView file, converted to time domain. Args: omegas(optional): frequencies. timeFinal(optional): final time of simulation. periodResolution(optional): number of time steps per period. name(optional): Base name to be used for data output. filename(optional): Name of output file. Returns: Output filename. """ if omegas is None: omegas = np.real(self.mus) if warpings is None: warpings = [None] * self.nsamples if not isinstance(timeFinal, (list, tuple,)): timeFinal = [timeFinal] * self.nsamples if not isinstance(periodResolution, (list, tuple,)): periodResolution = [periodResolution] * self.nsamples filesOut = [None] * self.nsamples for j in range(self.nsamples): filesOut[j] = self.HFEngine.outParaviewTimeDomain( self.samples[j], omegas[j], warpings[j], self.sample_state, timeFinal[j], periodResolution[j], "{}_{}".format(name, j), "{}_{}".format(filename, j), **kwargs) if filesOut[0] is None: return None return filesOut diff --git a/rrompy/utilities/exception_manager/generic_assert.py b/rrompy/utilities/exception_manager/generic_assert.py index 51a813d..777db68 100644 --- a/rrompy/utilities/exception_manager/generic_assert.py +++ b/rrompy/utilities/exception_manager/generic_assert.py @@ -1,31 +1,33 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # +from collections.abc import Iterable from rrompy.utilities.exception_manager import RROMPyException __all__ = ['RROMPy_READY', 'RROMPy_FRAGILE', 'RROMPyAssert'] RROMPy_READY = "ready" RROMPy_FRAGILE = "fragile" def RROMPyAssert(obj, checkVal = RROMPy_READY, what = "Current mode", message = ""): - if obj != checkVal and obj not in checkVal: + if obj != checkVal and (not isinstance(checkVal, Iterable) + or obj not in checkVal): raise RROMPyException("{} {} not compatible with {}. {}".format( what, obj, checkVal, message)) diff --git a/rrompy/utilities/expression/monomial_creator.py b/rrompy/utilities/expression/monomial_creator.py index 8803efb..8286b60 100644 --- a/rrompy/utilities/expression/monomial_creator.py +++ b/rrompy/utilities/expression/monomial_creator.py @@ -1,58 +1,59 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import numpy as np +from collections.abc import Iterable from rrompy.utilities.numerical.factorials import multibinom from rrompy.utilities.numerical.hash_derivative import (nextDerivativeIndices, hashIdxToDerivative as hashI, hashDerivativeToIdx as hashD) from rrompy.utilities.base.types import List, TupleAny __all__ = ["createMonomial", "createMonomialList"] def createMonomial(deg:List[int], return_derivatives : bool = False) -> List[List[TupleAny]]: - if not hasattr(deg, "__len__"): deg = [deg] + if not isinstance(deg, Iterable): deg = [deg] dim = len(deg) degj = hashD(deg) expr = [] for k in range(degj * return_derivatives + 1): degder = hashI(k, dim) derdiff = [x - y for (x, y) in zip(deg, degder)] if all([d >= 0 for d in derdiff]): mult = multibinom(deg, degder) if np.sum(derdiff) == 0: exprLoc = (mult,) else: exprLoc = ("prod", {"axis" : 1}, ("x", "**", derdiff)) if not np.isclose(mult, 1): exprLoc = exprLoc + ("*", mult,) expr += [exprLoc] else: expr += [(0.,)] if return_derivatives: expr += [None] return expr def createMonomialList(n:int, dim:int, return_derivatives : bool = False) -> List[List[TupleAny]]: derIdxs = nextDerivativeIndices([], dim, n) idxList = [] for j, der in enumerate(derIdxs): idxList += [createMonomial(der, return_derivatives)] return idxList diff --git a/rrompy/utilities/numerical/factorials.py b/rrompy/utilities/numerical/factorials.py index 19b4b07..2b6870f 100644 --- a/rrompy/utilities/numerical/factorials.py +++ b/rrompy/utilities/numerical/factorials.py @@ -1,33 +1,34 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from numpy import prod from scipy.special import binom, factorial +from collections.abc import Iterable from rrompy.utilities.base.types import List __all__ = ['multibinom', 'multifactorial'] def multibinom(x:List[int], y:List[int]) -> int: - if not hasattr(x, "__len__"): x = [x] - if not hasattr(y, "__len__"): y = [y] + if not isinstance(x, Iterable): x = [x] + if not isinstance(y, Iterable): y = [y] return int(prod([binom(a, b) for (a, b) in zip(x, y)])) def multifactorial(x:List[int]) -> int: - if not hasattr(x, "__len__"): x = [x] + if not isinstance(x, Iterable): x = [x] return int(prod([factorial(a) for a in x])) diff --git a/rrompy/utilities/poly_fitting/heaviside/heaviside_to_from_rational.py b/rrompy/utilities/poly_fitting/heaviside/heaviside_to_from_rational.py index 7af0f52..123c4e2 100644 --- a/rrompy/utilities/poly_fitting/heaviside/heaviside_to_from_rational.py +++ b/rrompy/utilities/poly_fitting/heaviside/heaviside_to_from_rational.py @@ -1,96 +1,98 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import numpy as np from rrompy.utilities.poly_fitting.polynomial.polynomial_interpolator import ( PolynomialInterpolator, PolynomialInterpolatorNodal) from rrompy.utilities.poly_fitting.polynomial.vander import polyvander from rrompy.utilities.poly_fitting.custom_fit import customFit from rrompy.utilities.base.types import Np1D, Np2D, Tuple, DictAny, interpEng from rrompy.parameter.parameter_sampling import (RandomSampler as RS, QuadratureSampler as QS) from .val import polyval from rrompy.utilities.exception_manager import RROMPyException, RROMPyAssert __all__ = ['heaviside2rational', 'rational2heaviside'] def heaviside2rational(c:Np2D, poles:Np1D, murange : Np1D = None, basis : str = "MONOMIAL_HEAVISIDE", basisD : str = None, parameterMap : DictAny = 1.) \ -> Tuple[interpEng, interpEng]: basisN = basis.split("_")[0] if basisD is None: basisD = basisN num = PolynomialInterpolator() den = PolynomialInterpolatorNodal() den.polybasis, den.nodes = basisD, poles if murange is None: multiplier = [1., 1.j] avgs = [np.mean(np.real(poles)), np.mean(np.imag(poles))] ranges = np.array([[np.min(np.real(poles)), np.max(np.real(poles))], [np.min(np.imag(poles)), np.max(np.imag(poles))]]) domIdx = np.argmax(ranges[:, 1] - ranges[:, 0]) murange = [multiplier[domIdx] * x + multiplier[1 - domIdx] * avgs[1 - domIdx] for x in ranges[domIdx, :]] if basis == "CHEBYSHEV": sampler = QS(murange, "CHEBYSHEV", parameterMap) elif basis == "LEGENDRE": sampler = QS(murange, "GAUSSLEGENDRE", parameterMap) else: sampler = RS(murange, "HALTON", parameterMap) xAux = sampler.generatePoints(len(c)) valsAux = den(xAux) * polyval(xAux, c, poles, basis) num.setupByInterpolation(xAux, valsAux, len(c) - 1, basisN) return num, den def rational2heaviside(num:interpEng, den:interpEng, murange : Np1D = np.array([-1., 1.]), scl : Np1D = None, parameterMap : DictAny = 1.) \ -> Tuple[Np2D, Np1D, str]: if (not isinstance(num, PolynomialInterpolator) or not isinstance(den, PolynomialInterpolator)): raise RROMPyException(("Rational numerator and denominator must be " "polynomial interpolators.")) RROMPyAssert(num.npar, 1, "Number of parameters") RROMPyAssert(den.npar, 1, "Number of parameters") basis = num.polybasis + "_HEAVISIDE" c = np.zeros_like(num.coeffs) poles = den.roots() - Psp = num(poles) - Qsp = den(poles) - Qspder = den(poles, 1, scl) - polesBad = np.abs(Qsp) >= 1e-5 - Psp[..., polesBad] = 0. - Qspder[polesBad] = 1. - c[: len(poles)] = (Psp / Qspder).T + if len(poles) > 0: + Psp = num(poles) + Qsp = den(poles) + Qspder = den(poles, 1, scl) + polesBad = np.abs(Qsp) >= 1e-5 + Psp[..., polesBad] = 0. + Qspder[polesBad] = 1. + c[: len(poles)] = (Psp / Qspder).T if len(c) > len(poles): from rrompy.parameter.parameter_sampling import (RandomSampler as RS, QuadratureSampler as QS) if num.polybasis == "CHEBYSHEV": sampler = QS(murange, "CHEBYSHEV", parameterMap) elif num.polybasis == "LEGENDRE": sampler = QS(murange, "GAUSSLEGENDRE", parameterMap) else: sampler = RS(murange, "HALTON", parameterMap) xAux = sampler.generatePoints(len(c)) - valsAux = (num(xAux) / den(xAux) - - polyval(xAux, c, poles, basis)).T + valsAux = num(xAux) / den(xAux) + if len(poles) > 0: + valsAux -= polyval(xAux, c, poles, basis) VanAux = polyvander(xAux, [len(c) - len(poles) - 1], num.polybasis) - c[len(poles) :] = customFit(VanAux, valsAux) - poles[polesBad] = np.inf + c[len(poles) :] = customFit(VanAux, valsAux.T) + if len(poles) > 0: poles[polesBad] = np.inf return c, poles, basis diff --git a/rrompy/utilities/poly_fitting/nearest_neighbor/nearest_neighbor_interpolator.py b/rrompy/utilities/poly_fitting/nearest_neighbor/nearest_neighbor_interpolator.py index 984450e..96a2d90 100644 --- a/rrompy/utilities/poly_fitting/nearest_neighbor/nearest_neighbor_interpolator.py +++ b/rrompy/utilities/poly_fitting/nearest_neighbor/nearest_neighbor_interpolator.py @@ -1,90 +1,91 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # -import numpy as np from copy import deepcopy as copy +import numpy as np +from collections.abc import Iterable from rrompy.utilities.base.types import List, ListAny, Np1D, Np2D, paramList from rrompy.utilities.poly_fitting.interpolator import GenericInterpolator from .val import polyval from rrompy.utilities.numerical import dot from rrompy.utilities.exception_manager import RROMPyAssert from rrompy.parameter import checkParameterList __all__ = ['NearestNeighborInterpolator'] class NearestNeighborInterpolator(GenericInterpolator): def __init__(self, other = None): if other is None: return self.support = other.support self.coeffsLocal = other.coeffsLocal self.nNeighbors = other.nNeighbors self.directionalWeights = other.directionalWeights self.npar = other.npar @property def shape(self): sh = self.coeffsLocal.shape[1 :] if self.coeffsLocal.ndim > 1 else 1 return sh def __call__(self, mu:paramList, der : List[int] = None, scl : Np1D = None): if der is not None and np.sum(der) > 0: return np.zeros(self.coeffsLocal.shape[1 :] + (len(mu),)) return polyval(mu, self.coeffsLocal, self.support, self.nNeighbors, self.directionalWeights) def __copy__(self): return NearestNeighborInterpolator(self) def __deepcopy__(self, memo): other = NearestNeighborInterpolator() (other.support, other.coeffsLocal, other.nNeighbors, other.directionalWeights, other.npar) = copy((self.support, self.coeffsLocal, self.nNeighbors, self.directionalWeights, self.npar), memo) return other def postmultiplyTensorize(self, A:Np2D): RROMPyAssert(A.shape[0], self.shape[-1], "Shape of output") self.coeffsLocal = dot(self.coeffsLocal, A) def pad(self, nleft : List[int] = None, nright : List[int] = None): if nleft is None: nleft = [0] * len(self.shape) if nright is None: nright = [0] * len(self.shape) - if not hasattr(nleft, "__len__"): nleft = [nleft] - if not hasattr(nright, "__len__"): nright = [nright] + if not isinstance(nleft, Iterable): nleft = [nleft] + if not isinstance(nright, Iterable): nright = [nright] RROMPyAssert(len(self.shape), len(nleft), "Shape of output") RROMPyAssert(len(self.shape), len(nright), "Shape of output") padwidth = [(0, 0)] + [(l, r) for l, r in zip(nleft, nright)] self.coeffsLocal = np.pad(self.coeffsLocal, padwidth, "constant", constant_values = (0., 0.)) def setupByInterpolation(self, support:paramList, values:ListAny, nNeighbors : int = 1, directionalWeights : Np1D = None): support = checkParameterList(support) RROMPyAssert(len(support), len(values), "Number of support values") self.support = copy(support) self.npar = support.shape[1] self.coeffsLocal = values self.nNeighbors = max(1, nNeighbors) if directionalWeights is None: directionalWeights = [1.] * self.npar self.directionalWeights = np.array(directionalWeights) RROMPyAssert(len(support), len(values), "Number of support points") return True, None diff --git a/rrompy/utilities/poly_fitting/piecewise_linear/piecewise_linear_interpolator.py b/rrompy/utilities/poly_fitting/piecewise_linear/piecewise_linear_interpolator.py index ef8d67a..4347d79 100644 --- a/rrompy/utilities/poly_fitting/piecewise_linear/piecewise_linear_interpolator.py +++ b/rrompy/utilities/poly_fitting/piecewise_linear/piecewise_linear_interpolator.py @@ -1,96 +1,97 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # +from copy import deepcopy as copy import numpy as np from scipy.linalg import solve_triangular -from copy import deepcopy as copy +from collections.abc import Iterable from rrompy.utilities.base.types import List, ListAny, Np1D, Np2D, paramList from rrompy.utilities.poly_fitting.interpolator import GenericInterpolator from .kernel import vander, val from rrompy.utilities.numerical import dot from rrompy.utilities.exception_manager import RROMPyException, RROMPyAssert from rrompy.parameter import checkParameterList __all__ = ['PiecewiseLinearInterpolator'] class PiecewiseLinearInterpolator(GenericInterpolator): def __init__(self, other = None): if other is None: return self.support = other.support self.lims = other.lims self.coeffs = other.coeffs self.depths = other.depths self.npar = other.npar self.kind = other.kind @property def shape(self): sh = self.coeffs.shape[1 :] if self.coeffs.ndim > 1 else 1 return sh def __call__(self, mu:paramList, der : List[int] = None, scl : Np1D = None): if der is not None and np.sum(der) > 0: raise RROMPyException(("Cannot take derivatives of piecewise " "linear function.")) return val(mu, self.coeffs, self.support, self.depths, self.kind, self.lims) def __copy__(self): return PiecewiseLinearInterpolator(self) def __deepcopy__(self, memo): other = PiecewiseLinearInterpolator() (other.support, other.lims, other.coeffs, other.depths, other.npar, other.kind) = copy((self.support, self.lims, self.coeffs, self.depths, self.npar, self.kind), memo) return other def postmultiplyTensorize(self, A:Np2D): RROMPyAssert(A.shape[0], self.shape[-1], "Shape of output") self.coeffs = dot(self.coeffs, A) def pad(self, nleft : List[int] = None, nright : List[int] = None): if nleft is None: nleft = [0] * len(self.shape) if nright is None: nright = [0] * len(self.shape) - if not hasattr(nleft, "__len__"): nleft = [nleft] - if not hasattr(nright, "__len__"): nright = [nright] + if not isinstance(nleft, Iterable): nleft = [nleft] + if not isinstance(nright, Iterable): nright = [nright] RROMPyAssert(len(self.shape), len(nleft), "Shape of output") RROMPyAssert(len(self.shape), len(nright), "Shape of output") padwidth = [(0, 0)] + [(l, r) for l, r in zip(nleft, nright)] self.coeffs = np.pad(self.coeffs, padwidth, "constant", constant_values = (0., 0.)) padwidth = [(0, 0)] * (self.npar - 1) + padwidth def setupByInterpolation(self, support:paramList, values:ListAny, lims:paramList, depths:Np2D, kind : str = "PIECEWISE_LINEAR_UNIFORM"): support = checkParameterList(support) RROMPyAssert(len(support), len(values), "Number of support values") self.support = copy(support) self.npar = support.shape[1] lims = checkParameterList(lims, self.npar) self.lims = copy(lims) self.depths = copy(depths) self.kind = kind van = vander(support, depths, kind, lims) outDim = values.shape[1:] values = values.reshape(values.shape[0], -1) self.coeffs = solve_triangular(van, values, unit_diagonal = True, lower = True).reshape((len(support),) + outDim) diff --git a/rrompy/utilities/poly_fitting/polynomial/marginalize.py b/rrompy/utilities/poly_fitting/polynomial/marginalize.py index 66da859..4d27854 100644 --- a/rrompy/utilities/poly_fitting/polynomial/marginalize.py +++ b/rrompy/utilities/poly_fitting/polynomial/marginalize.py @@ -1,58 +1,59 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # -from numpy import array, polynomial as po from copy import deepcopy as copy +from numpy import array, polynomial as po +from collections.abc import Iterable from .base import polybases from rrompy.utilities.base.types import Np1D, Np2D from rrompy.utilities.base import freepar as fp from rrompy.utilities.exception_manager import RROMPyAssert, RROMPyException __all__ = ['polymarginalize'] def polymarginalize(c:Np2D, basis:str, marginalVals : Np1D = [fp], nMarginal : int = None) -> Np1D: if not hasattr(c, "ndim"): c = array(c) ndim = c.ndim - if not hasattr(marginalVals, "__len__"): marginalVals = [marginalVals] + if not isinstance(marginalVals, Iterable): marginalVals = [marginalVals] marginalVals = list(marginalVals) if basis.upper() not in polybases: raise RROMPyException("Polynomial basis not recognized.") polyvalbase = {"CHEBYSHEV" : po.chebyshev.chebval, "LEGENDRE" : po.legendre.legval, "MONOMIAL" : po.polynomial.polyval}[basis.upper()] RROMPyAssert(ndim, len(marginalVals), "Marginalized variables") marginalDims = [] for j in range(len(marginalVals)): if marginalVals[j] == fp: marginalDims += [c.shape[j]] if nMarginal is not None and len(marginalDims) != nMarginal: raise RROMPyException(("Exactly {} 'freepar' entries in marginalVals " "must be provided.").format(nMarginal)) cEff = [copy(c)] for d in range(ndim): if marginalVals[d] != fp: for dj in range(len(cEff)): cEff[dj] = polyvalbase(marginalVals[d], cEff[dj], tensor = False) else: cEff2 = [] for dj in range(len(cEff)): cEff2 += list(cEff[dj]) cEff = copy(cEff2) return array(cEff).reshape(tuple(marginalDims)) diff --git a/rrompy/utilities/poly_fitting/polynomial/polynomial_interpolator.py b/rrompy/utilities/poly_fitting/polynomial/polynomial_interpolator.py index 507d287..d23c708 100644 --- a/rrompy/utilities/poly_fitting/polynomial/polynomial_interpolator.py +++ b/rrompy/utilities/poly_fitting/polynomial/polynomial_interpolator.py @@ -1,221 +1,222 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # +from copy import deepcopy as copy import numpy as np from scipy.special import factorial as fact +from collections.abc import Iterable from itertools import combinations -from copy import deepcopy as copy from rrompy.utilities.base.types import (List, ListAny, DictAny, Np1D, Np2D, paramList) from rrompy.utilities.base import freepar as fp from rrompy.utilities.poly_fitting.interpolator import GenericInterpolator from rrompy.utilities.poly_fitting.custom_fit import customFit from .base import polyfitname from .val import polyval from .roots import polyroots from .vander import polyvander as pv from .polynomial_algebra import changePolyBasis, polyTimes from rrompy.utilities.numerical import dot from rrompy.utilities.numerical.degree import degreeTotalToFull from rrompy.utilities.exception_manager import RROMPyAssert, RROMPyException from rrompy.parameter import checkParameterList __all__ = ['PolynomialInterpolator', 'PolynomialInterpolatorNodal'] class PolynomialInterpolator(GenericInterpolator): def __init__(self, other = None): if other is None: return self.coeffs = other.coeffs self.npar = other.npar self.polybasis = other.polybasis @property def shape(self): if self.coeffs.ndim > self.npar: sh = self.coeffs.shape[self.npar :] else: sh = tuple([1]) return sh @property def deg(self): return [x - 1 for x in self.coeffs.shape[: self.npar]] def __getitem__(self, key): return self.coeffs[key] def __call__(self, mu:paramList, der : List[int] = None, scl : Np1D = None): if hasattr(self, "_dirPivot"): mu = checkParameterList(mu)(self._dirPivot) return polyval(mu, self.coeffs, self.polybasis, der, scl) def __copy__(self): return PolynomialInterpolator(self) def __deepcopy__(self, memo): other = PolynomialInterpolator() other.coeffs, other.npar, other.polybasis = copy( (self.coeffs, self.npar, self.polybasis), memo) return other def pad(self, nleft : List[int] = None, nright : List[int] = None): if nleft is None: nleft = [0] * len(self.shape) if nright is None: nright = [0] * len(self.shape) - if not hasattr(nleft, "__len__"): nleft = [nleft] - if not hasattr(nright, "__len__"): nright = [nright] + if not isinstance(nleft, Iterable): nleft = [nleft] + if not isinstance(nright, Iterable): nright = [nright] RROMPyAssert(len(self.shape), len(nleft), "Shape of output") RROMPyAssert(len(self.shape), len(nright), "Shape of output") padwidth = [(0, 0)] * self.npar padwidth = padwidth + [(l, r) for l, r in zip(nleft, nright)] self.coeffs = np.pad(self.coeffs, padwidth, "constant", constant_values = (0., 0.)) def postmultiplyTensorize(self, A:Np2D): RROMPyAssert(A.shape[0], self.shape[-1], "Shape of output") self.coeffs = dot(self.coeffs, A) def setupByInterpolation(self, support:paramList, values:ListAny, deg:int, polybasis : str = "MONOMIAL", verbose : bool = True, totalDegree : bool = True, vanderCoeffs : DictAny = {}, fitCoeffs : DictAny = {}): support = checkParameterList(support) self.npar = support.shape[1] self.polybasis = polybasis - if not totalDegree and not hasattr(deg, "__len__"): + if not totalDegree and not isinstance(deg, Iterable): deg = [deg] * self.npar vander = pv(support, deg, basis = polybasis, **vanderCoeffs) RROMPyAssert(len(vander), len(values), "Number of support values") outDim = values.shape[1:] values = values.reshape(values.shape[0], -1) fitOut = customFit(vander, values, full = True, **fitCoeffs) P = fitOut[0] if verbose: msg = ("Fitting {} samples with degree {} through {}... " "Conditioning of LS system: {:.4e}.").format( len(vander), deg, polyfitname(self.polybasis), fitOut[1][2][0] / fitOut[1][2][-1]) else: msg = None if totalDegree: self.coeffs = degreeTotalToFull(tuple([deg + 1] * self.npar) + outDim, self.npar, P) else: self.coeffs = P.reshape(tuple([d + 1 for d in deg]) + outDim) return fitOut[1][1] == vander.shape[1], msg def roots(self, marginalVals : ListAny = [fp]): RROMPyAssert(self.shape, (1,), "Shape of output") RROMPyAssert(len(marginalVals), self.npar, "Number of parameters") rDim = marginalVals.index(fp) if rDim < len(marginalVals) - 1 and fp in marginalVals[rDim + 1 :]: raise RROMPyException(("Exactly 1 'freepar' entry in " "marginalVals must be provided.")) return polyroots(self.coeffs, self.polybasis, marginalVals) class PolynomialInterpolatorNodal(PolynomialInterpolator): def __init__(self, other = None): self.npar = 1 if other is None: return self.nodes = other.nodes self.polybasis = other.polybasis @property def nodes(self): return self._nodes @nodes.setter def nodes(self, nodes): self.coeffs = None self._nodes = nodes @property def coeffs(self): if self._coeffs is None: self.buildCoeffs() return self._coeffs @coeffs.setter def coeffs(self, coeffs): self._coeffs = coeffs @property def shape(self): return (1,) @property def deg(self): return [len(self.nodes)] def __getitem__(self, key): return self.coeffs[key] def __call__(self, mu:paramList, der : List[int] = None, scl : Np1D = None): dirPivot = self._dirPivot if hasattr(self, "_dirPivot") else 0 if der is None: der = 0 elif isinstance(der, (list,tuple,np.ndarray,)): der = der[dirPivot] if scl is None: scl = 1. elif isinstance(scl, (list,tuple,np.ndarray,)): scl = scl[dirPivot] mu = checkParameterList(mu)(dirPivot) val = np.zeros(len(mu), dtype = np.complex) if der == self.deg[0]: val[:] = 1. elif der >= 0 and der < self.deg[0]: plDist = (np.repeat(np.expand_dims(mu, 1), self.deg[0], axis = 1) - self.nodes.reshape(1, -1)) for terms in combinations(np.arange(self.deg[0]), self.deg[0] - der): val += np.prod(plDist[:, list(terms)], axis = 1) return scl ** der * fact(der) * val def __copy__(self): return PolynomialInterpolatorNodal(self) def __deepcopy__(self, memo): other = PolynomialInterpolatorNodal() other.nodes, other.polybasis = copy((self.nodes, self.polybasis), memo) return other def buildCoeffs(self): local = [np.array([- pl, 1.], dtype = np.complex) for pl in self.nodes] N = len(local) while N > 1: for j in range(N // 2): local[j] = polyTimes(local[j], local[- 1 - j]) local = local[(N - 1) // 2 :: -1] N = len(local) self._coeffs = changePolyBasis(local[0], None, "MONOMIAL", self.polybasis) def pad(self, *args, **kwargs): raise RROMPyException(("Padding not allowed for polynomials in nodal " "form")) def postmultiplyTensorize(self, *args, **kwargs): raise RROMPyException(("Post-multiply not allowed for polynomials in " "nodal form")) def setupByInterpolation(self, support:paramList, *args, **kwargs): support = checkParameterList(support) self.npar = support.shape[1] if self.npar > 1: raise RROMPyException(("Polynomial in nodal form must have " "scalar output")) output = super().setupByInterpolation(support, *args, **kwargs) self._nodes = super().roots() return output def roots(self, marginalVals : ListAny = [fp]): return self.nodes diff --git a/rrompy/utilities/poly_fitting/polynomial/vander.py b/rrompy/utilities/poly_fitting/polynomial/vander.py index 20aa591..1616c64 100644 --- a/rrompy/utilities/poly_fitting/polynomial/vander.py +++ b/rrompy/utilities/poly_fitting/polynomial/vander.py @@ -1,129 +1,130 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import numpy as np +from collections.abc import Iterable from .base import polybases from .der import polyder from rrompy.utilities.base.types import Np1D, Np2D, List, paramList from rrompy.utilities.numerical.degree import totalDegreeSet from rrompy.parameter import checkParameterList from rrompy.utilities.exception_manager import RROMPyException, RROMPyAssert __all__ = ['polyvander'] def firstDerTransition(dim:int, TDirac:List[Np2D], basis:str, scl : Np1D = None) -> Np2D: C_m = np.zeros((dim, len(TDirac), len(TDirac)), dtype = float) for j, Tj in enumerate(TDirac): m, om = [0] * dim, [(0, 0)] * dim for idx in range(dim): m[idx], om[idx] = 1, (0, 1) J_der = polyder(Tj, basis, m, scl) if J_der.size != len(TDirac): J_der = np.pad(J_der, mode = "constant", pad_width = om) C_m[idx, :, j] = np.ravel(J_der) m[idx], om[idx] = 0, (0, 0) return C_m def polyvander(x:paramList, degs:List[int], basis:str, derIdxs : List[List[List[int]]] = None, reorder : List[int] = None, scl : Np1D = None, forceTotalDegree : bool = False) -> Np2D: """ Compute full Hermite-Vandermonde matrix with specified derivative directions. E.g. assume that we want to obtain the Vandermonde matrix for (value, derx, derx2) at x = [0, 0], (value, dery) at x = [1, 0], (dery, derxy) at x = [0, 0], of degree 3 in x and 1 in y, using Chebyshev polynomials. This can be done by polyvander([[0, 0], [1, 0]], # unique sample points [3, 1], # polynomial degree "chebyshev", # polynomial family [ [[0, 0], [1, 0], [2, 0], [0, 1], [1, 1]], # derivative directions at first point [[0, 0], [0, 1]] # derivative directions at second point ], [0, 1, 2, 5, 6, 3, 4] # reorder indices ) """ x = checkParameterList(x) dim = x.shape[1] totalDeg = (forceTotalDegree or not isinstance(degs, (list, tuple, np.ndarray,))) if forceTotalDegree and isinstance(degs, (list, tuple, np.ndarray,)): if np.any(np.array(degs) != degs[0]): raise RROMPyException(("Cannot force total degree if prescribed " "degrees are different")) degs = degs[0] if not isinstance(degs, (list, tuple, np.ndarray,)): degs = [degs] * dim RROMPyAssert(len(degs), dim, "Number of parameters") x_un, idx_un = x.unique(return_inverse = True) if len(x_un) < len(x): raise RROMPyException("Sample points must be distinct.") del x_un if basis.upper() not in polybases: raise RROMPyException("Polynomial basis not recognized.") vanderbase = {"CHEBYSHEV" : np.polynomial.chebyshev.chebvander, "LEGENDRE" : np.polynomial.legendre.legvander, "MONOMIAL" : np.polynomial.polynomial.polyvander }[basis.upper()] VanBase = vanderbase(x(0), degs[0]) for j in range(1, dim): VNext = vanderbase(x(j), degs[j]) for jj in range(j): VNext = np.expand_dims(VNext, 1) VanBase = VanBase[..., None] * VNext VanBase = VanBase.reshape((len(x), -1)) if derIdxs is None or VanBase.shape[-1] <= 1: Van = VanBase else: derFlat, idxRep = [], [] for j, derIdx in enumerate(derIdxs): derFlat += derIdx[:] idxRep += [j] * len(derIdx[:]) for j in range(len(derFlat)): - if not hasattr(derFlat[j], "__len__"): + if not isinstance(derFlat[j], Iterable): derFlat[j] = [derFlat[j]] RROMPyAssert(len(derFlat[j]), dim, "Number of dimensions") TDirac = [y.reshape([d + 1 for d in degs]) for y in np.eye(VanBase.shape[-1], dtype = int)] Cs_loc = firstDerTransition(dim, TDirac, basis, scl) Van = np.empty((len(derFlat), VanBase.shape[-1]), dtype = VanBase.dtype) for j in range(len(derFlat)): Van[j, :] = VanBase[idxRep[j], :] for k in range(dim): for der in range(derFlat[j][k]): Van[j, :] = Van[j, :].dot(Cs_loc[k]) / (der + 1) if reorder is not None: Van = Van[reorder, :] if not totalDeg: return Van derIdxs, mask = totalDegreeSet(degs[0], dim, return_mask = True) ordIdxs = np.empty(len(derIdxs), dtype = int) derTotal = np.array([np.sum(y) for y in derIdxs]) idxPrev = 0 rangeAux = np.arange(len(derIdxs)) for j in range(degs[0] + 1): idxLocal = rangeAux[derTotal == j][::-1] idxPrev += len(idxLocal) ordIdxs[idxPrev - len(idxLocal) : idxPrev] = idxLocal return Van[:, mask][:, ordIdxs] diff --git a/rrompy/utilities/poly_fitting/radial_basis/radial_basis_interpolator.py b/rrompy/utilities/poly_fitting/radial_basis/radial_basis_interpolator.py index 1fce87c..a7fa3e6 100644 --- a/rrompy/utilities/poly_fitting/radial_basis/radial_basis_interpolator.py +++ b/rrompy/utilities/poly_fitting/radial_basis/radial_basis_interpolator.py @@ -1,133 +1,134 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # -import numpy as np from copy import deepcopy as copy +import numpy as np +from collections.abc import Iterable from rrompy.utilities.base.types import (List, ListAny, DictAny, Np1D, Np2D, paramList) from rrompy.utilities.poly_fitting.interpolator import GenericInterpolator from rrompy.utilities.poly_fitting.custom_fit import customFit from .base import polyfitname from .val import polyval from .vander import polyvander as pv from rrompy.utilities.numerical import dot from rrompy.utilities.numerical.degree import degreeTotalToFull from rrompy.utilities.exception_manager import RROMPyException, RROMPyAssert from rrompy.parameter import checkParameterList __all__ = ['RadialBasisInterpolator'] class RadialBasisInterpolator(GenericInterpolator): def __init__(self, other = None): if other is None: return self.support = other.support self.coeffsGlobal = other.coeffsGlobal self.coeffsLocal = other.coeffsLocal self.directionalWeights = other.directionalWeights self.npar = other.npar self.polybasis = other.polybasis @property def shape(self): sh = self.coeffsLocal.shape[1 :] if self.coeffsLocal.ndim > 1 else 1 return sh @property def deg(self): return [x - 1 for x in self.coeffsGlobal.shape[: self.npar]] def __call__(self, mu:paramList, der : List[int] = None, scl : Np1D = None): if der is not None and np.sum(der) > 0: raise RROMPyException(("Cannot take derivatives of radial basis " "function.")) return polyval(mu, self.coeffsGlobal, self.coeffsLocal, self.support, self.directionalWeights, self.polybasis) def __copy__(self): return RadialBasisInterpolator(self) def __deepcopy__(self, memo): other = RadialBasisInterpolator() (other.support, other.coeffsGlobal, other.coeffsLocal, other.directionalWeights, other.npar, other.polybasis) = copy( (self.support, self.coeffsGlobal, self.coeffsLocal, self.directionalWeights, self.npar, self.polybasis), memo) return other def postmultiplyTensorize(self, A:Np2D): RROMPyAssert(A.shape[0], self.shape[-1], "Shape of output") self.coeffsLocal = dot(self.coeffsLocal, A) self.coeffsGlobal = dot(self.coeffsGlobal, A) def pad(self, nleft : List[int] = None, nright : List[int] = None): if nleft is None: nleft = [0] * len(self.shape) if nright is None: nright = [0] * len(self.shape) - if not hasattr(nleft, "__len__"): nleft = [nleft] - if not hasattr(nright, "__len__"): nright = [nright] + if not isinstance(nleft, Iterable): nleft = [nleft] + if not isinstance(nright, Iterable): nright = [nright] RROMPyAssert(len(self.shape), len(nleft), "Shape of output") RROMPyAssert(len(self.shape), len(nright), "Shape of output") padwidth = [(0, 0)] + [(l, r) for l, r in zip(nleft, nright)] self.coeffsLocal = np.pad(self.coeffsLocal, padwidth, "constant", constant_values = (0., 0.)) padwidth = [(0, 0)] * (self.npar - 1) + padwidth self.coeffsGlobal = np.pad(self.coeffsGlobal, padwidth, "constant", constant_values = (0., 0.)) def setupByInterpolation(self, support:paramList, values:ListAny, deg:int, polybasis : str = "MONOMIAL_GAUSSIAN", directionalWeights : Np1D = None, verbose : bool = True, totalDegree : bool = True, vanderCoeffs : DictAny = {}, fitCoeffs : DictAny = {}): support = checkParameterList(support) RROMPyAssert(len(support), len(values), "Number of support values") self.support = copy(support) if "reorder" in vanderCoeffs.keys(): self.support = self.support[vanderCoeffs["reorder"]] self.npar = support.shape[1] if directionalWeights is None: directionalWeights = [1.] * self.npar directionalWeights = np.array(directionalWeights) self.polybasis = polybasis - if not totalDegree and not hasattr(deg, "__len__"): + if not totalDegree and not isinstance(deg, Iterable): deg = [deg] * self.npar vander, self.directionalWeights = pv(support, deg, basis = polybasis, directionalWeights = directionalWeights, **vanderCoeffs) outDim = values.shape[1:] values = values.reshape(values.shape[0], -1) values = np.pad(values, ((0, len(vander) - len(values)), (0, 0)), "constant") fitOut = customFit(vander, values, full = True, **fitCoeffs) P = fitOut[0][len(support) :] if verbose: msg = ("Fitting {}+{} samples with degree {} through {}... " "Conditioning of LS system: {:.4e}.").format( len(support), len(vander) - len(support), deg, polyfitname(self.polybasis), fitOut[1][2][0] / fitOut[1][2][-1]) else: msg = None self.coeffsLocal = fitOut[0][: len(support)].reshape((len(support),) + outDim) if totalDegree: self.coeffsGlobal = degreeTotalToFull(tuple([deg + 1] * self.npar) + outDim, self.npar, P) else: self.coeffsGlobal = P.reshape(tuple([d + 1 for d in deg]) + outDim) return fitOut[1][1] == vander.shape[1], msg diff --git a/rrompy/utilities/poly_fitting/radial_basis/vander.py b/rrompy/utilities/poly_fitting/radial_basis/vander.py index 51d3228..101e5d6 100644 --- a/rrompy/utilities/poly_fitting/radial_basis/vander.py +++ b/rrompy/utilities/poly_fitting/radial_basis/vander.py @@ -1,95 +1,96 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import numpy as np +from collections.abc import Iterable from .kernel import kernels from rrompy.utilities.poly_fitting.polynomial.vander import polyvander as pvP from rrompy.utilities.base.types import Np1D, Np2D, Tuple, List, paramList from rrompy.parameter import checkParameterList from rrompy.utilities.exception_manager import RROMPyException, RROMPyAssert __all__ = ['rbvander', 'polyvander'] def rbvander(x:paramList, basis:str, reorder : List[int] = None, directionalWeights : Np1D = None) -> Np2D: """Compute radial-basis-Vandermonde matrix.""" x = checkParameterList(x) x_un = x.unique() nx = len(x) if len(x_un) < nx: raise RROMPyException("Sample points must be distinct.") del x_un x = x.data if directionalWeights is None: directionalWeights = np.ones(x.shape[1]) - elif not hasattr(directionalWeights, "__len__"): + elif not isinstance(directionalWeights, Iterable): directionalWeights = directionalWeights * np.ones(x.shape[1]) RROMPyAssert(len(directionalWeights), x.shape[1], "Number of directional weights") basis = basis.upper() if basis not in kernels.keys(): raise RROMPyException("Radial basis not recognized.") radialker = kernels[basis] Van = np.zeros((nx, nx)) if reorder is not None: x = x[reorder] xDiff2V, xDiff2I, xDiff2J = np.zeros(0), [], [] for i in range(nx - 1): xiD2Loc = np.sum(np.abs((x[i + 1 :] - x[i]) * directionalWeights) ** 2., axis = 1) xiD2Good = np.where(xiD2Loc <= radialker.threshold)[0] xDiff2V = np.append(xDiff2V, xiD2Loc[xiD2Good]) xDiff2I += [i] * len(xiD2Good) xDiff2J += list(i + 1 + xiD2Good) kernelV = radialker(xDiff2V, apply_threshold = False) Van = np.eye(nx) Van[xDiff2I, xDiff2J] = kernelV Van[xDiff2J, xDiff2I] = kernelV return Van def polyvander(x:paramList, degs:List[int], basis:str, derIdxs : List[List[List[int]]] = None, reorder : List[int] = None, directionalWeights : Np1D = None, scl : Np1D = None, forceTotalDegree : bool = False, optimizeScalingBounds : List[float] = [-1., -1.], maxOptimizeIter : int = 10) -> Tuple[Np2D, Np1D]: """ Compute full Hermite-Vandermonde matrix with specified derivative directions. """ if derIdxs is not None and np.sum(np.sum(derIdxs)) > 0: raise RROMPyException(("Cannot take derivatives of radial basis " "function.")) basisp, basisr = basis.split("_") VanP = pvP(x, degs, basisp, derIdxs = derIdxs, reorder = reorder, scl = scl, forceTotalDegree = forceTotalDegree) VanZ = np.zeros([VanP.shape[1]] * 2) optDir, optFact = 0, 100. for it in range(maxOptimizeIter): VanR = rbvander(x, basisr, reorder = reorder, directionalWeights = directionalWeights) Van = np.block([[VanR, VanP], [VanP.T.conj(), VanZ]]) if optimizeScalingBounds[0] < 0. or it == maxOptimizeIter - 1: break ncond = np.linalg.cond(Van) if ncond < optimizeScalingBounds[0]: if optDir != -1: optFact **= .5 optDir, directionalWeights = -1, directionalWeights / optFact elif ncond > optimizeScalingBounds[1]: if optDir != 1: optFact **= .5 optDir, directionalWeights = 1, directionalWeights * optFact else: break return Van, directionalWeights