diff --git a/rrompy/hfengines/base/matrix_engine_base.py b/rrompy/hfengines/base/matrix_engine_base.py index b5cac5d..625a211 100644 --- a/rrompy/hfengines/base/matrix_engine_base.py +++ b/rrompy/hfengines/base/matrix_engine_base.py @@ -1,519 +1,519 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from abc import abstractmethod import numpy as np import scipy.sparse as scsp from numbers import Number from matplotlib import pyplot as plt from copy import deepcopy as copy, copy as softcopy from rrompy.utilities.base.types import (Np1D, Np2D, ScOp, strLst, TupleAny, List, ListAny, DictAny, paramVal, paramList, sampList) from rrompy.utilities.base import purgeList, getNewFilename from rrompy.utilities.expression import (expressionEvaluator, createMonomial, createMonomialList) from rrompy.utilities.numerical import (hashDerivativeToIdx as hashD, solve as tsolve, dot, customPInv) from rrompy.utilities.exception_manager import RROMPyException, RROMPyAssert from rrompy.parameter import checkParameter, checkParameterList from rrompy.sampling import sampleList, emptySampleList from rrompy.solver import setupSolver __all__ = ['MatrixEngineBase'] class MatrixEngineBase: """ Generic solver for parametric matrix problems. Attributes: verbosity: Verbosity level. As: Scipy sparse array representation (in CSC format) of As. bs: Numpy array representation of bs. cs: Numpy array representation of cs. energyNormMatrix: Scipy sparse matrix representing inner product. energyNormDualMatrix: Scipy sparse matrix representing dual inner product. energyNormPartialDualMatrix: Scipy sparse matrix representing dual inner product without duality. """ def __init__(self, verbosity : int = 10, timestamp : bool = True): self.verbosity = verbosity self.timestamp = timestamp self._affinePoly = True self.nAs, self.nbs = 1, 1 self._C = None self.setSolver("SPSOLVE", {"use_umfpack" : False}) self.npar = 0 self.outputNormMatrix = 1. def name(self) -> str: return self.__class__.__name__ def __str__(self) -> str: return self.name() def __repr__(self) -> str: return self.__str__() + " at " + hex(id(self)) def __dir_base__(self): return [x for x in self.__dir__() if x[:2] != "__"] def __deepcopy__(self, memo): return softcopy(self) @property def npar(self): """Value of npar.""" return self._npar @npar.setter def npar(self, npar): nparOld = self._npar if hasattr(self, "_npar") else -1 if npar != nparOld: self.rescalingExp = [1.] * npar self._npar = npar @property def nAs(self): """Value of nAs.""" return self._nAs @nAs.setter def nAs(self, nAs): self._nAs = nAs self.resetAs() @property def nbs(self): """Value of nbs.""" return self._nbs @nbs.setter def nbs(self, nbs): self._nbs = nbs self.resetbs() @property def C(self): """Value of C.""" if self._C is None: self.buildC() return self._C @property def isCEye(self): return isinstance(self.C, Number) @property def affinePoly(self): return self._affinePoly @property def spacedim(self): return self.bs[0].shape[0] def checkParameter(self, mu:paramVal): return checkParameter(mu, self.npar) def checkParameterList(self, mu:paramList): return checkParameterList(mu, self.npar) def buildEnergyNormForm(self): """ Build sparse matrix (in CSR format) representative of scalar product. """ self.energyNormMatrix = 1. def buildEnergyNormDualForm(self): """ Build sparse matrix (in CSR format) representative of dual scalar product. """ self.energyNormDualMatrix = 1. def buildEnergyNormPartialDualForm(self): """ Build sparse matrix (in CSR format) representative of dual scalar product without duality. """ self.energyNormPartialDualMatrix = 1. def innerProduct(self, u:Np2D, v:Np2D, onlyDiag : bool = False, dual : bool = False, is_state : bool = True) -> Np2D: """Scalar product.""" if is_state or self.isCEye: if dual: if not hasattr(self, "energyNormPartialDualMatrix"): self.buildEnergyNormPartialDualForm() energyMat = self.energyNormPartialDualMatrix else: if not hasattr(self, "energyNormMatrix"): self.buildEnergyNormForm() energyMat = self.energyNormMatrix else: energyMat = self.outputNormMatrix if not isinstance(u, (np.ndarray,)): u = u.data if not isinstance(v, (np.ndarray,)): v = v.data if onlyDiag: return np.sum(dot(energyMat, u) * v.conj(), axis = 0) return dot(dot(energyMat, u).T, v.conj()).T def norm(self, u:Np2D, dual : bool = False, is_state : bool = True) -> Np1D: return np.abs(self.innerProduct(u, u, onlyDiag = True, dual = dual, is_state = is_state)) ** .5 def checkAInBounds(self, derI : int = 0): """Check if derivative index is oob for operator of linear system.""" if derI < 0: d = self.spacedim return scsp.csr_matrix((np.zeros(0), np.zeros(0), np.zeros(d + 1)), shape = (d, d), dtype = np.complex) def checkbInBounds(self, derI : int = 0): """Check if derivative index is oob for RHS of linear system.""" if derI < 0: return np.zeros(self.spacedim, dtype = np.complex) def resetAs(self): """Reset (derivatives of) operator of linear system.""" self.setAs([None] * self.nAs) self.setthAs([None] * self.nAs) def resetbs(self): """Reset (derivatives of) RHS of linear system.""" self.setbs([None] * self.nbs) self.setthbs([None] * self.nbs) def getMonomialSingleWeight(self, deg:List[int]): return createMonomial(deg, True) def getMonomialWeights(self, n:int): return createMonomialList(n, self.npar, True) def setAs(self, As:List[Np2D]): """Assign terms of operator of linear system.""" if len(As) != self.nAs: raise RROMPyException(("Expected number {} of terms of As not " "matching given list length {}.").format(self.nAs, len(As))) self.As = [copy(A) for A in As] def setthAs(self, thAs:List[List[TupleAny]]): """Assign terms of operator of linear system.""" if len(thAs) != self.nAs: raise RROMPyException(("Expected number {} of terms of thAs not " "matching given list length {}.").format(self.nAs, len(thAs))) self.thAs = copy(thAs) def setbs(self, bs:List[Np1D]): """Assign terms of RHS of linear system.""" if len(bs) != self.nbs: raise RROMPyException(("Expected number {} of terms of bs not " "matching given list length {}.").format(self.nbs, len(bs))) self.bs = [copy(b) for b in bs] def setthbs(self, thbs:List[List[TupleAny]]): """Assign terms of RHS of linear system.""" if len(thbs) != self.nbs: raise RROMPyException(("Expected number {} of terms of thbs not " "matching given list length {}.").format(self.nbs, len(thbs))) self.thbs = copy(thbs) def _assembleObject(self, mu:paramVal, objs:ListAny, th:ListAny, derI:int) -> ScOp: """Assemble (derivative of) object from list of derivatives.""" mu = self.checkParameter(mu) rExp = self.rescalingExp muE = mu ** rExp obj = None for j in range(len(objs)): if len(th[j]) <= derI and th[j][-1] is not None: raise RROMPyException(("Cannot assemble operator. Non enough " "derivatives of theta provided.")) if len(th[j]) > derI and th[j][derI] is not None: expr = expressionEvaluator(th[j][derI], muE) if hasattr(expr, "__len__"): if len(expr) > 1: raise RROMPyException(("Size mismatch in value of " "theta function. Only scalars " "allowed.")) expr = expr[0] if obj is None: obj = expr * objs[j] else: obj = obj + expr * objs[j] return obj @abstractmethod def buildA(self): """Build terms of operator of linear system.""" if self.thAs[0] is None: self.thAs = self.getMonomialWeights(self.nAs) if self.As[0] is None: self.As[0] = scsp.eye(self.spacedim, format = "csr") for j in range(self.nAs): if self.As[j] is None: self.As[j] = self.checkAInBounds(-1) def A(self, mu : paramVal = [], der : List[int] = 0) -> ScOp: """ Assemble terms of operator of linear system and return it (or its derivative) at a given parameter. """ derI = hashD(der) if hasattr(der, "__len__") else der Anull = self.checkAInBounds(derI) if Anull is not None: return Anull self.buildA() assembledA = self._assembleObject(mu, self.As, self.thAs, derI) if assembledA is None: return self.checkAInBounds(-1) return assembledA @abstractmethod def buildb(self): """Build terms of RHS of linear system.""" if self.thbs[0] is None: self.thbs = self.getMonomialWeights(self.nbs) for j in range(self.nbs): if self.bs[j] is None: self.bs[j] = self.checkbInBounds(-1) def b(self, mu : paramVal = [], der : List[int] = 0) -> Np1D: """ Assemble terms of RHS of linear system and return it (or its derivative) at a given parameter. """ derI = hashD(der) if hasattr(der, "__len__") else der bnull = self.checkbInBounds(derI) if bnull is not None: return bnull self.buildb() assembledb = self._assembleObject(mu, self.bs, self.thbs, derI) if assembledb is None: return self.checkbInBounds(-1) return assembledb def buildC(self): """Build terms of LHS of linear system.""" if self._C is None: self._C = 1. def applyC(self, u:sampList): """Apply LHS of linear system.""" return dot(self.C, u) def applyCpInv(self, u:sampList): """Apply pseudoinverse of LHS of linear system.""" return dot(customPInv(self.C), u) _isStateShiftZero = True def stateShift(self, mu : paramVal = []) -> Np1D: return np.zeros((self.spacedim, len(mu))) _isOutputShiftZero = True def outputShift(self, mu : paramVal = []) -> Np1D: return self.applyC(self.stateShift(mu)) def setSolver(self, solverType:str, solverArgs : DictAny = {}): """Choose solver type and parameters.""" self._solver, self._solverArgs = setupSolver(solverType, solverArgs) def solve(self, mu : paramList = [], RHS : sampList = None, - force_state : bool = False, verbose : bool = False) -> sampList: + return_state : bool = False, verbose : bool = False) -> sampList: """ Find solution of linear system. Args: mu: parameter value. RHS: RHS of linear system. If None, defaults to that of parametric system. Defaults to None. - force_state: whether to return state before multiplication by c. + return_state: whether to return state before multiplication by c. Defaults to False. verbose: whether to notify for each solution computed. Defaults to False. """ if mu == []: mu = self.mu0 mu = self.checkParameterList(mu)[0] if self.npar == 0: mu.reset((1, self.npar), mu.dtype) if len(mu) == 0: return emptySampleList() if RHS is None: RHS = [self.b(m) for m in mu] RHS = sampleList(RHS) mult = 0 if len(RHS) == 1 else 1 RROMPyAssert(mult * (len(mu) - 1) + 1, len(RHS), "Sample size") for j in range(len(mu)): u = tsolve(self.A(mu[j]), RHS[mult * j], self._solver, self._solverArgs) - if force_state: + if return_state: if j == 0: sol = emptySampleList() sol.reset((len(u), len(mu)), dtype = u.dtype) sol[j] = u else: if j == 0: sol = np.empty((len(u), len(mu)), dtype = u.dtype) sol[:, j] = u if verbose: print("." * (j % 5 != 4) + "*" * (j % 5 == 4), end = "") - if not force_state: + if not return_state: sol = sampleList(self.applyC(sol) - self.outputShift(mu)) #FIXME else: sol = sampleList(sol - self.stateShift(mu)) #FIXME if verbose: print() return sol def residual(self, mu : paramList = [], u : sampList = None, post_c : bool = True) -> sampList: """ Find residual of linear system for given approximate solution. Args: mu: parameter value. u: numpy complex array with function dofs. If None, set to 0. post_c: whether to post-process using c. Defaults to True. """ if mu == []: mu = self.mu0 mu = self.checkParameterList(mu)[0] if self.npar == 0: mu.reset((1, self.npar), mu.dtype) if len(mu) == 0: return emptySampleList() v = sampleList(self.stateShift(mu)) if u is not None: v = v + sampleList(u) #FIXME for j in range(len(mu)): r = self.b(mu[j]) - dot(self.A(mu[j]), v[j]) if post_c: if j == 0: res = np.empty((len(r), len(mu)), dtype = r.dtype) res[:, j] = r else: if j == 0: res = emptySampleList() res.reset((len(r), len(mu)), dtype = r.dtype) res[j] = r if post_c: res = sampleList(self.applyC(res)) return res def _rayleighQuotient(self, A:Np2D, v0:Np1D, M:Np2D, sigma : float = 0., nIterP : int = 10, nIterR : int = 10) -> float: nIterP = min(nIterP, len(v0) // 2) nIterR = min(nIterR, (len(v0) + 1) // 2) v0 /= dot(dot(M, v0).T, v0.conj()) ** .5 for j in range(nIterP): v0 = tsolve(A - sigma * M, dot(M, v0), self._solver, self._solverArgs) v0 /= dot(dot(M, v0).T, v0.conj()) ** .5 l0 = dot(A.dot(v0).T, v0.conj()) for j in range(nIterR): v0 = tsolve(A - l0 * M, dot(M, v0), self._solver, self._solverArgs) v0 /= dot(dot(M, v0).T, v0.conj()) ** .5 l0 = dot(A.dot(v0).T, v0.conj()) if np.isnan(l0): l0 = np.finfo(float).eps return np.abs(l0) def stabilityFactor(self, mu : paramList = [], u : sampList = None, nIterP : int = 10, nIterR : int = 10) -> sampList: """ Find stability factor of matrix of linear system using iterative inverse power iteration- and Rayleigh quotient-based procedure. Args: mu: parameter values. u: numpy complex arrays with function dofs. nIterP: number of iterations of power method. nIterR: number of iterations of Rayleigh quotient method. """ if mu == []: mu = self.mu0 mu = self.checkParameterList(mu)[0] if self.npar == 0: mu.reset((1, self.npar), mu.dtype) u = sampleList(u) solShift = self.stateShift(mu) if len(u) == len(mu): u = u + solShift #FIXME else: u = sampleList(solShift) + np.tile(u.data, (1, len(mu))) #FIXME stabFact = np.empty(len(mu), dtype = float) if not hasattr(self, "energyNormMatrix"): self.buildEnergyNormForm() for j in range(len(mu)): stabFact[j] = self._rayleighQuotient(self.A(mu[j]), u[j], self.energyNormMatrix, 0., nIterP, nIterR) return stabFact def plot(self, u:Np1D, warping : List[callable] = None, name : str = "u", save : str = None, what : strLst = 'all', saveFormat : str = "eps", saveDPI : int = 100, show : bool = True, pyplotArgs : dict = {}, **figspecs) -> str: """ Do some nice plots of the complex-valued function with given dofs. Args: u: numpy complex array with function dofs. name(optional): Name to be shown as title of the plots. Defaults to 'u'. save(optional): Where to save plot(s). Defaults to None, i.e. no saving. what(optional): Which plots to do. If list, can contain 'ABS', 'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard 'ALL'. Defaults to 'ALL'. saveFormat(optional): Format for saved plot(s). Defaults to "eps". saveDPI(optional): DPI for saved plot(s). Defaults to 100. show(optional): Whether to show figure. Defaults to True. pyplotArgs(optional): Optional arguments for pyplot. figspecs(optional key args): Optional arguments for matplotlib figure creation. Returns: Output filename. """ if isinstance(what, (str,)): if what.upper() == 'ALL': what = ['ABS', 'PHASE', 'REAL', 'IMAG'] else: what = [what] what = purgeList(what, ['ABS', 'PHASE', 'REAL', 'IMAG'], listname = self.name() + ".what", baselevel = 1) if len(what) == 0: return if 'figsize' not in figspecs.keys(): figspecs['figsize'] = (13. * len(what) / 4, 3) subplotcode = 100 + len(what) * 10 idxs = np.arange(self.spacedim) if warping is not None: idxs = warping[0](np.arange(self.spacedim)) plt.figure(**figspecs) plt.jet() if 'ABS' in what: subplotcode = subplotcode + 1 plt.subplot(subplotcode) plt.plot(idxs, np.abs(u).flatten(), **pyplotArgs) plt.title("|{0}|".format(name)) if 'PHASE' in what: subplotcode = subplotcode + 1 plt.subplot(subplotcode) plt.plot(idxs, np.angle(u).flatten(), **pyplotArgs) plt.title("phase({0})".format(name)) if 'REAL' in what: subplotcode = subplotcode + 1 plt.subplot(subplotcode) plt.plot(idxs, np.real(u).flatten(), **pyplotArgs) plt.title("Re({0})".format(name)) if 'IMAG' in what: subplotcode = subplotcode + 1 plt.subplot(subplotcode) plt.plot(idxs, np.imag(u).flatten(), **pyplotArgs) plt.title("Im({0})".format(name)) if save is not None: save = save.strip() fileOut = getNewFilename("{}_fig_".format(save), saveFormat) plt.savefig(fileOut, format = saveFormat, dpi = saveDPI) else: fileOut = None if show: plt.show() plt.close() return fileOut diff --git a/rrompy/reduction_methods/base/generic_approximant.py b/rrompy/reduction_methods/base/generic_approximant.py index 5bca9bc..34e15c5 100644 --- a/rrompy/reduction_methods/base/generic_approximant.py +++ b/rrompy/reduction_methods/base/generic_approximant.py @@ -1,906 +1,907 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from abc import abstractmethod import numpy as np from itertools import product as iterprod from copy import deepcopy as copy from os import remove as osrm from rrompy.sampling.standard import (SamplingEngineStandard, SamplingEngineStandardPOD) from rrompy.utilities.base.types import (Np1D, DictAny, HFEng, List, Tuple, ListAny, strLst, paramVal, paramList, sampList) from rrompy.utilities.base import (purgeDict, verbosityManager as vbMng, getNewFilename) from rrompy.utilities.exception_manager import (RROMPyException, RROMPyAssert, RROMPy_READY, RROMPy_FRAGILE) from rrompy.utilities.base import pickleDump, pickleLoad from rrompy.parameter import (emptyParameterList, checkParameter, checkParameterList) from rrompy.sampling import sampleList, emptySampleList __all__ = ['GenericApproximant'] def addNormFieldToClass(self, fieldName): def objFunc(self, mu:paramList, *args, **kwargs) -> Np1D: uV = getattr(self.__class__, "get" + fieldName)(self, mu) kwargs["is_state"] = False val = self.HFEngine.norm(uV, *args, **kwargs) return val setattr(self.__class__, "norm" + fieldName, objFunc) def addNormDualFieldToClass(self, fieldName): def objFunc(self, mu:paramList, *args, **kwargs) -> Np1D: uV = getattr(self.__class__, "get" + fieldName)(self, mu) kwargs["is_state"] = True if "dual" not in kwargs.keys(): kwargs["dual"] = True val = self.HFEngine.norm(uV, *args, **kwargs) return val setattr(self.__class__, "norm" + fieldName, objFunc) def addPlotFieldToClass(self, fieldName): def objFunc(self, mu:paramList, *args, **kwargs): uV = getattr(self.__class__, "get" + fieldName)(self, mu) kwargsCopy = copy(kwargs) filesOut = [] for j, u in enumerate(uV): if "name" in kwargs.keys(): kwargsCopy["name"] = kwargs["name"] + str(j) filesOut += [self.HFEngine.plot(u, *args, **kwargsCopy)] if filesOut[0] is None: return None return filesOut setattr(self.__class__, "plot" + fieldName, objFunc) def addPlotDualFieldToClass(self, fieldName): def objFunc(self, mu:paramList, *args, **kwargs): uV = getattr(self.__class__, "get" + fieldName)(self, mu) kwargsCopy = copy(kwargs) filesOut = [] for j, u in enumerate(uV): if "name" in kwargs.keys(): kwargsCopy["name"] = kwargs["name"] + str(j) filesOut += [self.HFEngine.plot(u, *args, **kwargsCopy)] if filesOut[0] is None: return None return filesOut setattr(self.__class__, "plot" + fieldName, objFunc) def addOutParaviewFieldToClass(self, fieldName): def objFunc(self, mu:paramVal, *args, **kwargs): if not hasattr(self.HFEngine, "outParaview"): raise RROMPyException(("High fidelity engine cannot output to " "Paraview.")) uV = getattr(self.__class__, "get" + fieldName)(self, mu) kwargsCopy = copy(kwargs) filesOut = [] for j, u in enumerate(uV): if "name" in kwargs.keys(): kwargsCopy["name"] = kwargs["name"] + str(j) filesOut += [self.HFEngine.outParaview(u, *args, **kwargsCopy)] if filesOut[0] is None: return None return filesOut setattr(self.__class__, "outParaview" + fieldName, objFunc) def addOutParaviewTimeDomainFieldToClass(self, fieldName): def objFunc(self, mu:paramVal, *args, **kwargs): if not hasattr(self.HFEngine, "outParaviewTimeDomain"): raise RROMPyException(("High fidelity engine cannot output to " "Paraview.")) uV = getattr(self.__class__, "get" + fieldName)(self, mu) omega = args.pop(0) if len(args) > 0 else np.real(mu) kwargsCopy = copy(kwargs) filesOut = [] for j, u in enumerate(uV): if "name" in kwargs.keys(): kwargsCopy["name"] = kwargs["name"] + str(j) filesOut += [self.HFEngine.outParaviewTimeDomain(u, omega, *args, **kwargsCopy)] if filesOut[0] is None: return None return filesOut setattr(self.__class__, "outParaviewTimeDomain" + fieldName, objFunc) def getTrainedModelClass(name): from importlib import import_module as im try: return getattr(im("rrompy.reduction_methods.trained_model"), name) except: raise RROMPyException("Trained model name not recognized.") class GenericApproximant: """ ABSTRACT ROM approximant computation for parametric problems. Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': whether to compute POD of snapshots; defaults to True; - 'S': total number of samples current approximant relies upon. Defaults to empty dict. - force_state(optional): Whether to approximate state. Defaults to False. + approx_state(optional): Whether to approximate state. Defaults to + False. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. trainedModel: Trained model evaluator. mu0: Default parameter. approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList{Soft,Critical}. parameterListSoft: Recognized keys of soft approximant parameters: - 'POD': whether to compute POD of snapshots. parameterListCritical: Recognized keys of critical approximant parameters: - 'S': total number of samples current approximant relies upon. - force_state: Whether to approximate state. + approx_state: Whether to approximate state. verbosity: Verbosity level. POD: Whether to compute POD of snapshots. S: Number of solution snapshots over which current approximant is based upon. samplingEngine: Sampling engine. uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as sampleList. lastSolvedHF: Parameter(s) corresponding to last computed high fidelity solution(s) as parameterList. uApproxReduced: Reduced approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApproxReduced: Parameter(s) corresponding to last computed reduced approximate solution(s) as parameterList. uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApprox: Parameter(s) corresponding to last computed approximate solution(s) as parameterList. """ __all__ += [ftype + dtype for ftype, dtype in iterprod( ["norm", "plot", "outParaview", "outParaviewTimeDomain"], ["HF", "RHS", "Approx", "Res", "Err"])] def __init__(self, HFEngine:HFEng, mu0 : paramVal = None, - approxParameters : DictAny = {}, force_state : bool = False, + approxParameters : DictAny = {}, approx_state : bool = False, verbosity : int = 10, timestamp : bool = True): self._preInit() self._mode = RROMPy_READY - self.force_state = force_state + self.approx_state = approx_state self.verbosity = verbosity self.timestamp = timestamp vbMng(self, "INIT", "Initializing engine of type {}.".format(self.name()), 10) self._HFEngine = HFEngine self.trainedModel = None self.lastSolvedHF = emptyParameterList() self.uHF = emptySampleList() self._addParametersToList(["POD"], [True], ["S"], [1]) if mu0 is None: if hasattr(self.HFEngine, "mu0"): self.mu0 = checkParameter(self.HFEngine.mu0) else: raise RROMPyException(("Center of approximation cannot be " "inferred from HF engine. Parameter " "required")) else: self.mu0 = checkParameter(mu0, self.HFEngine.npar) self.resetSamples() self.approxParameters = approxParameters self._postInit() ### add norm{HF,Err} methods """ Compute norm of * at arbitrary parameter. Args: mu: Target parameter. Returns: Target norm of *. """ for objName in ["HF", "Err"]: addNormFieldToClass(self, objName) ### add norm{RHS,Res} methods """ Compute norm of * at arbitrary parameter. Args: mu: Target parameter. Returns: Target norm of *. """ for objName in ["RHS", "Res"]: addNormDualFieldToClass(self, objName) ### add plot{HF,Approx,Err} methods """ Do some nice plots of * at arbitrary parameter. Args: mu: Target parameter. name(optional): Name to be shown as title of the plots. Defaults to 'u'. what(optional): Which plots to do. If list, can contain 'ABS', 'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard 'ALL'. Defaults to 'ALL'. save(optional): Where to save plot(s). Defaults to None, i.e. no saving. saveFormat(optional): Format for saved plot(s). Defaults to "eps". saveDPI(optional): DPI for saved plot(s). Defaults to 100. show(optional): Whether to show figure. Defaults to True. figspecs(optional key args): Optional arguments for matplotlib figure creation. """ for objName in ["HF", "Approx", "Err"]: addPlotFieldToClass(self, objName) ### add plot{RHS,Res} methods """ Do some nice plots of * at arbitrary parameter. Args: mu: Target parameter. name(optional): Name to be shown as title of the plots. Defaults to 'u'. what(optional): Which plots to do. If list, can contain 'ABS', 'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard 'ALL'. Defaults to 'ALL'. save(optional): Where to save plot(s). Defaults to None, i.e. no saving. saveFormat(optional): Format for saved plot(s). Defaults to "eps". saveDPI(optional): DPI for saved plot(s). Defaults to 100. show(optional): Whether to show figure. Defaults to True. figspecs(optional key args): Optional arguments for matplotlib figure creation. """ for objName in ["RHS", "Res"]: addPlotDualFieldToClass(self, objName) ### add outParaview{HF,RHS,Approx,Res,Err} methods """ Output * to ParaView file. Args: mu: Target parameter. name(optional): Base name to be used for data output. filename(optional): Name of output file. time(optional): Timestamp. what(optional): Which plots to do. If list, can contain 'MESH', 'ABS', 'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard 'ALL'. Defaults to 'ALL'. forceNewFile(optional): Whether to create new output file. filePW(optional): Fenics File entity (for time series). """ for objName in ["HF", "RHS", "Approx", "Res", "Err"]: addOutParaviewFieldToClass(self, objName) ### add outParaviewTimeDomain{HF,RHS,Approx,Res,Err} methods """ Output * to ParaView file, converted to time domain. Args: mu: Target parameter. omega(optional): frequency. timeFinal(optional): final time of simulation. periodResolution(optional): number of time steps per period. name(optional): Base name to be used for data output. filename(optional): Name of output file. forceNewFile(optional): Whether to create new output file. """ for objName in ["HF", "RHS", "Approx", "Res", "Err"]: addOutParaviewTimeDomainFieldToClass(self, objName) def _preInit(self): if not hasattr(self, "depth"): self.depth = 0 else: self.depth += 1 @property def tModelType(self): raise RROMPyException("No trainedModel type assigned.") def initializeModelData(self, datadict): from rrompy.reduction_methods.trained_model import TrainedModelData return (TrainedModelData(datadict["mu0"], datadict.pop("projMat"), datadict["scaleFactor"], datadict.pop("rescalingExp")), ["mu0", "scaleFactor", "mus"]) @property def parameterList(self): """Value of parameterListSoft + parameterListCritical.""" return self.parameterListSoft + self.parameterListCritical def _addParametersToList(self, whatSoft:strLst, defaultSoft:ListAny, whatCritical : strLst = [], defaultCritical : ListAny = [], toBeExcluded : strLst = []): if not hasattr(self, "parameterToBeExcluded"): self.parameterToBeExcluded = [] self.parameterToBeExcluded = toBeExcluded + self.parameterToBeExcluded if not hasattr(self, "parameterListSoft"): self.parameterListSoft = [] if not hasattr(self, "parameterDefaultSoft"): self.parameterDefaultSoft = {} if not hasattr(self, "parameterListCritical"): self.parameterListCritical = [] if not hasattr(self, "parameterDefaultCritical"): self.parameterDefaultCritical = {} for j, what in enumerate(whatSoft): if what not in self.parameterToBeExcluded: self.parameterListSoft = [what] + self.parameterListSoft self.parameterDefaultSoft[what] = defaultSoft[j] for j, what in enumerate(whatCritical): if what not in self.parameterToBeExcluded: self.parameterListCritical = ([what] + self.parameterListCritical) self.parameterDefaultCritical[what] = defaultCritical[j] def _postInit(self): if self.depth == 0: vbMng(self, "DEL", "Done initializing.", 10) del self.depth else: self.depth -= 1 def name(self) -> str: return self.__class__.__name__ def __str__(self) -> str: return self.name() def __repr__(self) -> str: return self.__str__() + " at " + hex(id(self)) def setupSampling(self): """Setup sampling engine.""" RROMPyAssert(self._mode, message = "Cannot setup sampling engine.") if not hasattr(self, "_POD") or self._POD is None: return if self.POD: SamplingEngine = SamplingEngineStandardPOD else: SamplingEngine = SamplingEngineStandard self.samplingEngine = SamplingEngine(self.HFEngine, - force_state = self.force_state, + sample_state = self.approx_state, verbosity = self.verbosity) @property def HFEngine(self): """Value of HFEngine.""" return self._HFEngine @HFEngine.setter def HFEngine(self, HFEngine): raise RROMPyException("Cannot change HFEngine.") @property def mu0(self): """Value of mu0.""" return self._mu0 @mu0.setter def mu0(self, mu0): mu0 = checkParameter(mu0) if not hasattr(self, "_mu0") or mu0 != self.mu0: self.resetSamples() self._mu0 = mu0 @property def npar(self): """Number of parameters.""" return self.mu0.shape[1] @property def approxParameters(self): """Value of approximant parameters.""" return self._approxParameters @approxParameters.setter def approxParameters(self, approxParams): if not hasattr(self, "approxParameters"): self._approxParameters = {} approxParameters = purgeDict(approxParams, self.parameterList, dictname = self.name() + ".approxParameters", baselevel = 1) keyList = list(approxParameters.keys()) for key in self.parameterListCritical: if key in keyList: setattr(self, "_" + key, self.parameterDefaultCritical[key]) for key in self.parameterListSoft: if key in keyList: setattr(self, "_" + key, self.parameterDefaultSoft[key]) fragile = False for key in self.parameterListCritical: if key in keyList: val = approxParameters[key] else: val = getattr(self, "_" + key, None) if val is None: val = self.parameterDefaultCritical[key] getattr(self.__class__, key, None).fset(self, val) fragile = fragile or val is None for key in self.parameterListSoft: if key in keyList: val = approxParameters[key] else: val = getattr(self, "_" + key, None) if val is None: val = self.parameterDefaultSoft[key] getattr(self.__class__, key, None).fset(self, val) if fragile: self._mode = RROMPy_FRAGILE @property def POD(self): """Value of POD.""" return self._POD @POD.setter def POD(self, POD): if hasattr(self, "_POD"): PODold = self.POD else: PODold = -1 self._POD = POD self._approxParameters["POD"] = self.POD if PODold != self.POD: self.samplingEngine = None self.resetSamples() @property - def force_state(self): - """Value of force_state.""" - return self._force_state - @force_state.setter - def force_state(self, force_state): - if hasattr(self, "_force_state"): force_stateold = self.force_state - else: force_stateold = -1 - self._force_state = force_state - if force_stateold != self.force_state: + def approx_state(self): + """Value of approx_state.""" + return self._approx_state + @approx_state.setter + def approx_state(self, approx_state): + if hasattr(self, "_approx_state"): approx_stateold = self.approx_state + else: approx_stateold = -1 + self._approx_state = approx_state + if approx_stateold != self.approx_state: self.samplingEngine = None self.resetSamples() @property def S(self): """Value of S.""" return self._S @S.setter def S(self, S): if S <= 0: raise RROMPyException("S must be positive.") if hasattr(self, "_S") and self._S is not None: Sold = self.S else: Sold = -1 self._S = S self._approxParameters["S"] = self.S if Sold != self.S: self.resetSamples() @property def trainedModel(self): """Value of trainedModel.""" return self._trainedModel @trainedModel.setter def trainedModel(self, trainedModel): self._trainedModel = trainedModel if self._trainedModel is not None: self._trainedModel.lastSolvedApproxReduced = emptyParameterList() self._trainedModel.lastSolvedApprox = emptyParameterList() self.lastSolvedApproxReduced = emptyParameterList() self.lastSolvedApprox = emptyParameterList() self.uApproxReduced = emptySampleList() self.uApprox = emptySampleList() def resetSamples(self): if hasattr(self, "samplingEngine") and self.samplingEngine is not None: self.samplingEngine.resetHistory() else: self.setupSampling() self._mode = RROMPy_READY def plotSamples(self, warping : List[callable] = None, name : str = "u", save : str = None, what : strLst = 'all', saveFormat : str = "eps", saveDPI : int = 100, show : bool = True, plotArgs : dict = {}, **figspecs) -> List[str]: """ Do some nice plots of the samples. Args: warping(optional): Domain warping functions. name(optional): Name to be shown as title of the plots. Defaults to 'u'. what(optional): Which plots to do. If list, can contain 'ABS', 'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard 'ALL'. Defaults to 'ALL'. save(optional): Where to save plot(s). Defaults to None, i.e. no saving. saveFormat(optional): Format for saved plot(s). Defaults to "eps". saveDPI(optional): DPI for saved plot(s). Defaults to 100. show(optional): Whether to show figure. Defaults to True. plotArgs(optional): Optional arguments for fen/pyplot. figspecs(optional key args): Optional arguments for matplotlib figure creation. Returns: Output filenames. """ RROMPyAssert(self._mode, message = "Cannot plot samples.") return self.samplingEngine.plotSamples(warping, name, save, what, saveFormat, saveDPI, show, plotArgs, **figspecs) def outParaviewSamples(self, name : str = "u", filename : str = "out", times : Np1D = None, what : strLst = 'all', forceNewFile : bool = True, folders : bool = False, filePW = None) -> List[str]: """ Output samples to ParaView file. Args: name(optional): Base name to be used for data output. filename(optional): Name of output file. times(optional): Timestamps. what(optional): Which plots to do. If list, can contain 'MESH', 'ABS', 'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard 'ALL'. Defaults to 'ALL'. forceNewFile(optional): Whether to create new output file. folders(optional): Whether to split output in folders. filePW(optional): Fenics File entity (for time series). Returns: Output filenames. """ RROMPyAssert(self._mode, message = "Cannot output samples.") return self.samplingEngine.outParaviewSamples(name, folders, filename, times, what, forceNewFile, filePW) def outParaviewTimeDomainSamples(self, omegas : Np1D = None, timeFinal : Np1D = None, periodResolution : int = 20, name : str = "u", filename : str = "out", forceNewFile : bool = True, folders : bool = False) -> List[str]: """ Output samples to ParaView file, converted to time domain. Args: omegas(optional): frequencies. timeFinal(optional): final time of simulation. periodResolution(optional): number of time steps per period. name(optional): Base name to be used for data output. filename(optional): Name of output file. forceNewFile(optional): Whether to create new output file. folders(optional): Whether to split output in folders. Returns: Output filenames. """ RROMPyAssert(self._mode, message = "Cannot output samples.") return self.samplingEngine.outParaviewTimeDomainSamples( omegas, timeFinal, periodResolution, name, folders, filename, forceNewFile) def setSamples(self, samplingEngine): """Copy samplingEngine and samples.""" vbMng(self, "INIT", "Transfering samples.", 10) self.samplingEngine = copy(samplingEngine) vbMng(self, "DEL", "Done transfering samples.", 10) def setTrainedModel(self, model): """Deepcopy approximation from trained model.""" if hasattr(model, "storeTrainedModel"): verb = model.verbosity model.verbosity = 0 fileOut = model.storeTrainedModel() model.verbosity = verb else: try: fileOut = getNewFilename("trained_model", "pkl") pickleDump(model.data.__dict__, fileOut) except: raise RROMPyException(("Failed to store model data. Parameter " "model must have either " "storeTrainedModel or " "data.__dict__ properties.")) self.loadTrainedModel(fileOut) osrm(fileOut) @abstractmethod def setupApprox(self): """ Setup approximant. (ABSTRACT) Any specialization should include something like if self.checkComputedApprox(): return RROMPyAssert(self._mode, message = "Cannot setup approximant.") ... self.trainedModel = ... self.trainedModel.data = ... self.trainedModel.data.approxParameters = copy( self.approxParameters) """ pass def checkComputedApprox(self) -> bool: """ Check if setup of new approximant is not needed. Returns: True if new setup is not needed. False otherwise. """ return self._mode == RROMPy_FRAGILE or (self.trainedModel is not None and self.trainedModel.data.approxParameters == self.approxParameters) def _pruneBeforeEval(self, mu:paramList, field:str, append:bool, prune:bool) -> Tuple[paramList, Np1D, Np1D, bool]: mu = checkParameterList(mu, self.npar)[0] idx = np.empty(len(mu), dtype = np.int) if prune: jExtra = np.zeros(len(mu), dtype = bool) muExtra = emptyParameterList() lastSolvedMus = getattr(self, "lastSolved" + field) if (len(mu) > 0 and len(mu) == len(lastSolvedMus) and mu == lastSolvedMus): idx = np.arange(len(mu), dtype = np.int) return muExtra, jExtra, idx, True muKeep = copy(muExtra) for j in range(len(mu)): jPos = lastSolvedMus.find(mu[j]) if jPos is not None: idx[j] = jPos muKeep.append(mu[j]) else: jExtra[j] = True muExtra.append(mu[j]) if len(muKeep) > 0 and not append: lastSolvedu = getattr(self, "u" + field) idx[~jExtra] = getattr(self.__class__, "set" + field)(self, muKeep, lastSolvedu[idx[~jExtra]], append) append = True else: jExtra = np.ones(len(mu), dtype = bool) muExtra = mu return muExtra, jExtra, idx, append def _setObject(self, mu:paramList, field:str, object:sampList, append:bool) -> List[int]: newMus = checkParameterList(mu, self.npar)[0] newObj = sampleList(object) if append: getattr(self, "lastSolved" + field).append(newMus) getattr(self, "u" + field).append(newObj) Ltot = len(getattr(self, "u" + field)) return list(range(Ltot - len(newObj), Ltot)) setattr(self, "lastSolved" + field, copy(newMus)) setattr(self, "u" + field, copy(newObj)) return list(range(len(getattr(self, "u" + field)))) def setHF(self, muHF:paramList, uHF:sampleList, append : bool = False) -> List[int]: """Assign high fidelity solution.""" return self._setObject(muHF, "HF", uHF, append) def evalHF(self, mu:paramList, append : bool = False, prune : bool = True) -> List[int]: """ Find high fidelity solution with original parameters and arbitrary parameter. Args: mu: Target parameter. append(optional): Whether to append new HF solutions to old ones. prune(optional): Whether to remove duplicates of already appearing HF solutions. """ muExtra, jExtra, idx, append = self._pruneBeforeEval(mu, "HF", append, prune) if len(muExtra) > 0: vbMng(self, "INIT", "Solving HF model for mu = {}.".format(mu), 15) newuHFs = self.HFEngine.solve(muExtra) vbMng(self, "DEL", "Done solving HF model.", 15) idx[jExtra] = self.setHF(muExtra, newuHFs, append) return list(idx) def setApproxReduced(self, muApproxR:paramList, uApproxR:sampleList, append : bool = False) -> List[int]: """Assign high fidelity solution.""" return self._setObject(muApproxR, "ApproxReduced", uApproxR, append) def evalApproxReduced(self, mu:paramList, append : bool = False, prune : bool = True) -> List[int]: """ Evaluate reduced representation of approximant at arbitrary parameter. Args: mu: Target parameter. append(optional): Whether to append new HF solutions to old ones. prune(optional): Whether to remove duplicates of already appearing HF solutions. """ self.setupApprox() muExtra, jExtra, idx, append = self._pruneBeforeEval(mu, "ApproxReduced", append, prune) if len(muExtra) > 0: newuApproxs = self.trainedModel.getApproxReduced(muExtra) idx[jExtra] = self.setApproxReduced(muExtra, newuApproxs, append) return list(idx) def setApprox(self, muApprox:paramList, uApprox:sampleList, append : bool = False) -> List[int]: """Assign high fidelity solution.""" return self._setObject(muApprox, "Approx", uApprox, append) def evalApprox(self, mu:paramList, append : bool = False, prune : bool = True) -> List[int]: """ Evaluate approximant at arbitrary parameter. Args: mu: Target parameter. append(optional): Whether to append new HF solutions to old ones. prune(optional): Whether to remove duplicates of already appearing HF solutions. """ self.setupApprox() muExtra, jExtra, idx, append = self._pruneBeforeEval(mu, "Approx", append, prune) if len(muExtra) > 0: newuApproxs = self.trainedModel.getApprox(muExtra) idx[jExtra] = self.setApprox(muExtra, newuApproxs, append) return list(idx) def getHF(self, mu:paramList, append : bool = False, prune : bool = True) -> sampList: """ Get HF solution at arbitrary parameter. Args: mu: Target parameter. Returns: HFsolution. """ mu = checkParameterList(mu, self.npar)[0] idx = self.evalHF(mu, append = append, prune = prune) return self.uHF(idx) def getRHS(self, mu:paramList) -> sampList: """ Get linear system RHS at arbitrary parameter. Args: mu: Target parameter. Returns: Linear system RHS. """ return self.HFEngine.residual(mu, None) def getApproxReduced(self, mu:paramList, append : bool = False, prune : bool = True) -> sampList: """ Get approximant at arbitrary parameter. Args: mu: Target parameter. Returns: Reduced approximant. """ mu = checkParameterList(mu, self.npar)[0] idx = self.evalApproxReduced(mu, append = append, prune = prune) return self.uApproxReduced(idx) def getApprox(self, mu:paramList, append : bool = False, prune : bool = True) -> sampList: """ Get approximant at arbitrary parameter. Args: mu: Target parameter. Returns: Approximant. """ mu = checkParameterList(mu, self.npar)[0] idx = self.evalApprox(mu, append = append, prune = prune) return self.uApprox(idx) def getRes(self, mu:paramList) -> sampList: """ Get residual at arbitrary parameter. Args: mu: Target parameter. Returns: Approximant residual. """ - if not (self.force_state or self.HFEngine.isCEye): + if not (self.approx_state or self.HFEngine.isCEye): raise RROMPyException(("Residual of solution with non-scalar C " "not computable.")) return self.HFEngine.residual(mu, self.getApprox(mu) / self.HFEngine.C) def getErr(self, mu:paramList, append : bool = False, prune : bool = True) -> sampList: """ Get error at arbitrary parameter. Args: mu: Target parameter. Returns: Approximant error. """ return (self.getApprox(mu, append = append, prune =prune) - self.getHF(mu, append = append, prune = prune)) def normApprox(self, mu:paramList) -> float: """ Compute norm of approximant at arbitrary parameter. Args: mu: Target parameter. Returns: Target norm of approximant. """ if not (self.POD and self.HFEngine.isCEye): return self.HFEngine.norm(self.getApprox(mu), is_state = False) return np.linalg.norm(self.C * self.getApproxReduced(mu).data, axis = 0) def getPoles(self, *args, **kwargs) -> Np1D: """ Obtain approximant poles. Returns: Numpy complex vector of poles. """ self.setupApprox() vbMng(self, "INIT", "Computing poles of model.", 20) poles = self.trainedModel.getPoles(*args, **kwargs) vbMng(self, "DEL", "Done computing poles.", 20) return poles def storeTrainedModel(self, filenameBase : str = "trained_model", forceNewFile : bool = True) -> str: """Store trained reduced model to file.""" self.setupApprox() vbMng(self, "INIT", "Storing trained model to file.", 20) if forceNewFile: filename = getNewFilename(filenameBase, "pkl") else: filename = "{}.pkl".format(filenameBase) pickleDump(self.trainedModel.data.__dict__, filename) vbMng(self, "DEL", "Done storing trained model.", 20) return filename def loadTrainedModel(self, filename:str): """Load trained reduced model from file.""" vbMng(self, "INIT", "Loading pre-trained model from file.", 20) datadict = pickleLoad(filename) self.mu0 = datadict["mu0"] self.scaleFactor = datadict["scaleFactor"] self.mus = datadict["mus"] trainedModel = self.tModelType() trainedModel.verbosity = self.verbosity trainedModel.timestamp = self.timestamp data, selfkeys = self.initializeModelData(datadict) for key in selfkeys: setattr(self, key, datadict.pop(key)) approxParameters = datadict.pop("approxParameters") data.approxParameters = copy(approxParameters) for apkey in data.approxParameters.keys(): self._approxParameters[apkey] = approxParameters.pop(apkey) for key in datadict: setattr(data, key, datadict[key]) trainedModel.data = data self.trainedModel = trainedModel self._mode = RROMPy_FRAGILE vbMng(self, "DEL", "Done loading pre-trained model.", 20) diff --git a/rrompy/reduction_methods/greedy/generic_greedy_approximant.py b/rrompy/reduction_methods/greedy/generic_greedy_approximant.py index ea97708..d8a1891 100644 --- a/rrompy/reduction_methods/greedy/generic_greedy_approximant.py +++ b/rrompy/reduction_methods/greedy/generic_greedy_approximant.py @@ -1,677 +1,678 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from copy import deepcopy as copy import numpy as np from rrompy.reduction_methods.standard.generic_standard_approximant \ import GenericStandardApproximant from rrompy.utilities.base.types import (Np1D, Np2D, DictAny, HFEng, Tuple, List, normEng, paramVal, paramList, sampList) from rrompy.utilities.base import verbosityManager as vbMng from rrompy.utilities.numerical import dot from rrompy.utilities.expression import expressionEvaluator from rrompy.solver import normEngine from rrompy.utilities.exception_manager import (RROMPyException, RROMPyAssert, RROMPyWarning) from rrompy.parameter import checkParameterList, emptyParameterList __all__ = ['GenericGreedyApproximant'] def localL2Distance(mus:Np2D, badmus:Np2D) -> Np2D: return np.linalg.norm(np.tile(mus[..., np.newaxis], [1, 1, len(badmus)]) - badmus[..., np.newaxis].T, axis = 1) def pruneSamples(mus:paramList, badmus:paramList, tol : float = 1e-8) -> Np1D: """Remove from mus all the elements which are too close to badmus.""" if len(badmus) == 0: return mus proximity = np.min(localL2Distance(mus.data, badmus.data), axis = 1) return np.arange(len(mus))[proximity <= tol] class GenericGreedyApproximant(GenericStandardApproximant): """ ROM greedy interpolant computation for parametric problems (ABSTRACT). Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': whether to compute POD of snapshots; defaults to True; - 'S': number of starting training points; - 'sampler': sample point generator; - 'greedyTol': uniform error tolerance for greedy algorithm; defaults to 1e-2; - 'collinearityTol': collinearity tolerance for greedy algorithm; defaults to 0.; - 'interactive': whether to interactively terminate greedy algorithm; defaults to False; - 'maxIter': maximum number of greedy steps; defaults to 1e2; - 'refinementRatio': ratio of test points to be exhausted before test set refinement; defaults to 0.2; - 'nTestPoints': number of test points; defaults to 5e2; - 'trainSetGenerator': training sample points generator; defaults to sampler. Defaults to empty dict. - force_state(optional): Whether to approximate state. Defaults to False. + approx_state(optional): Whether to approximate state. Defaults to + False. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. mu0: Default parameter. mus: Array of snapshot parameters. approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList. parameterListSoft: Recognized keys of soft approximant parameters: - 'POD': whether to compute POD of snapshots. - 'greedyTol': uniform error tolerance for greedy algorithm; - 'collinearityTol': collinearity tolerance for greedy algorithm; - 'interactive': whether to interactively terminate greedy algorithm; - 'maxIter': maximum number of greedy steps; - 'refinementRatio': ratio of test points to be exhausted before test set refinement; - 'nTestPoints': number of test points; - 'trainSetGenerator': training sample points generator. parameterListCritical: Recognized keys of critical approximant parameters: - 'S': total number of samples current approximant relies upon; - 'sampler': sample point generator. - force_state: Whether to approximate state. + approx_state: Whether to approximate state. verbosity: Verbosity level. POD: whether to compute POD of snapshots. S: number of test points. sampler: Sample point generator. greedyTol: Uniform error tolerance for greedy algorithm. collinearityTol: Collinearity tolerance for greedy algorithm. interactive: whether to interactively terminate greedy algorithm. maxIter: maximum number of greedy steps. refinementRatio: ratio of training points to be exhausted before training set refinement. nTestPoints: number of starting training points. trainSetGenerator: training sample points generator. muBounds: list of bounds for parameter values. samplingEngine: Sampling engine. estimatorNormEngine: Engine for estimator norm computation. uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as sampleList. lastSolvedHF: Parameter(s) corresponding to last computed high fidelity solution(s) as parameterList. uApproxReduced: Reduced approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApproxReduced: Parameter(s) corresponding to last computed reduced approximate solution(s) as parameterList. uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApprox: Parameter(s) corresponding to last computed approximate solution(s) as parameterList. """ TOL_INSTABILITY = 1e-6 def __init__(self, HFEngine:HFEng, mu0 : paramVal = None, - approxParameters : DictAny = {}, force_state : bool = False, + approxParameters : DictAny = {}, approx_state : bool = False, verbosity : int = 10, timestamp : bool = True): self._preInit() self._addParametersToList(["greedyTol", "collinearityTol", "interactive", "maxIter", "refinementRatio", "nTestPoints"], [1e-2, 0., False, 1e2, .2, 5e2], ["trainSetGenerator"], ["AUTO"]) super().__init__(HFEngine = HFEngine, mu0 = mu0, approxParameters = approxParameters, - force_state = force_state, verbosity = verbosity, + approx_state = approx_state, verbosity = verbosity, timestamp = timestamp) self._postInit() @property def greedyTol(self): """Value of greedyTol.""" return self._greedyTol @greedyTol.setter def greedyTol(self, greedyTol): if greedyTol < 0: raise RROMPyException("greedyTol must be non-negative.") if hasattr(self, "_greedyTol") and self.greedyTol is not None: greedyTolold = self.greedyTol else: greedyTolold = -1 self._greedyTol = greedyTol self._approxParameters["greedyTol"] = self.greedyTol if greedyTolold != self.greedyTol: self.resetSamples() @property def collinearityTol(self): """Value of collinearityTol.""" return self._collinearityTol @collinearityTol.setter def collinearityTol(self, collinearityTol): if collinearityTol < 0: raise RROMPyException("collinearityTol must be non-negative.") if (hasattr(self, "_collinearityTol") and self.collinearityTol is not None): collinearityTolold = self.collinearityTol else: collinearityTolold = -1 self._collinearityTol = collinearityTol self._approxParameters["collinearityTol"] = self.collinearityTol if collinearityTolold != self.collinearityTol: self.resetSamples() @property def interactive(self): """Value of interactive.""" return self._interactive @interactive.setter def interactive(self, interactive): self._interactive = interactive @property def maxIter(self): """Value of maxIter.""" return self._maxIter @maxIter.setter def maxIter(self, maxIter): if maxIter <= 0: raise RROMPyException("maxIter must be positive.") if hasattr(self, "_maxIter") and self.maxIter is not None: maxIterold = self.maxIter else: maxIterold = -1 self._maxIter = maxIter self._approxParameters["maxIter"] = self.maxIter if maxIterold != self.maxIter: self.resetSamples() @property def refinementRatio(self): """Value of refinementRatio.""" return self._refinementRatio @refinementRatio.setter def refinementRatio(self, refinementRatio): if refinementRatio <= 0. or refinementRatio > 1.: raise RROMPyException(("refinementRatio must be between 0 " "(excluded) and 1.")) if (hasattr(self, "_refinementRatio") and self.refinementRatio is not None): refinementRatioold = self.refinementRatio else: refinementRatioold = -1 self._refinementRatio = refinementRatio self._approxParameters["refinementRatio"] = self.refinementRatio if refinementRatioold != self.refinementRatio: self.resetSamples() @property def nTestPoints(self): """Value of nTestPoints.""" return self._nTestPoints @nTestPoints.setter def nTestPoints(self, nTestPoints): if nTestPoints <= 0: raise RROMPyException("nTestPoints must be positive.") if not np.isclose(nTestPoints, np.int(nTestPoints)): raise RROMPyException("nTestPoints must be an integer.") nTestPoints = np.int(nTestPoints) if hasattr(self, "_nTestPoints") and self.nTestPoints is not None: nTestPointsold = self.nTestPoints else: nTestPointsold = -1 self._nTestPoints = nTestPoints self._approxParameters["nTestPoints"] = self.nTestPoints if nTestPointsold != self.nTestPoints: self.resetSamples() @property def trainSetGenerator(self): """Value of trainSetGenerator.""" return self._trainSetGenerator @trainSetGenerator.setter def trainSetGenerator(self, trainSetGenerator): if (isinstance(trainSetGenerator, (str,)) and trainSetGenerator.upper() == "AUTO"): trainSetGenerator = self.sampler if 'generatePoints' not in dir(trainSetGenerator): raise RROMPyException("trainSetGenerator type not recognized.") if (hasattr(self, '_trainSetGenerator') and self.trainSetGenerator not in [None, "AUTO"]): trainSetGeneratorOld = self.trainSetGenerator self._trainSetGenerator = trainSetGenerator self._approxParameters["trainSetGenerator"] = self.trainSetGenerator if (not 'trainSetGeneratorOld' in locals() or trainSetGeneratorOld != self.trainSetGenerator): self.resetSamples() def resetSamples(self): """Reset samples.""" super().resetSamples() self._mus = emptyParameterList() def initEstimatorNormEngine(self, normEngn : normEng = None): """Initialize estimator norm engine.""" if (normEngn is not None or not hasattr(self, "estimatorNormEngine") or self.estimatorNormEngine is None): if normEngn is None: if not hasattr(self.HFEngine, "energyNormPartialDualMatrix"): self.HFEngine.buildEnergyNormPartialDualForm() estimatorEnergyMatrix = ( self.HFEngine.energyNormPartialDualMatrix) else: if hasattr(normEngn, "buildEnergyNormPartialDualForm"): if not hasattr(normEngn, "energyNormPartialDualMatrix"): normEngn.buildEnergyNormPartialDualForm() estimatorEnergyMatrix = ( normEngn.energyNormPartialDualMatrix) else: estimatorEnergyMatrix = normEngn self.estimatorNormEngine = normEngine(estimatorEnergyMatrix) def _affineResidualMatricesContraction(self, rb:Np2D, rA : Np2D = None) \ -> Tuple[Np1D, Np1D, Np1D]: self.assembleReducedResidualBlocks(full = True) # 'ij,jk,ik->k', resbb, radiusb, radiusb.conj() ff = np.sum(self.trainedModel.data.resbb.dot(rb) * rb.conj(), axis = 0) if rA is None: return ff # 'ijk,jkl,il->l', resAb, radiusA, radiusb.conj() Lf = np.sum(np.tensordot(self.trainedModel.data.resAb, rA, 2) * rb.conj(), axis = 0) # 'ijkl,klt,ijt->t', resAA, radiusA, radiusA.conj() LL = np.sum(np.tensordot(self.trainedModel.data.resAA, rA, 2) * rA.conj(), axis = (0, 1)) return ff, Lf, LL def errorEstimator(self, mus:Np1D) -> Np1D: """Standard residual-based error estimator.""" self.setupApprox() mus = checkParameterList(mus, self.npar)[0] vbMng(self.trainedModel, "INIT", "Evaluating error estimator at mu = {}.".format(mus), 10) verb = self.trainedModel.verbosity self.trainedModel.verbosity = 0 uApproxRs = self.getApproxReduced(mus) muTestEff = mus ** self.HFEngine.rescalingExp radiusA = np.empty((len(self.HFEngine.thAs), len(mus)), dtype = np.complex) radiusb = np.empty((len(self.HFEngine.thbs), len(mus)), dtype = np.complex) for j, thA in enumerate(self.HFEngine.thAs): radiusA[j] = expressionEvaluator(thA[0], muTestEff) for j, thb in enumerate(self.HFEngine.thbs): radiusb[j] = expressionEvaluator(thb[0], muTestEff) radiusA = np.expand_dims(uApproxRs.data, 1) * radiusA ff, Lf, LL = self._affineResidualMatricesContraction(radiusb, radiusA) err = np.abs((LL - 2. * np.real(Lf) + ff) / ff) ** .5 self.trainedModel.verbosity = verb vbMng(self.trainedModel, "DEL", "Done evaluating error estimator", 10) return err def getMaxErrorEstimator(self, mus:paramList) -> Tuple[Np1D, int, float]: """ Compute maximum of (and index of maximum of) error estimator over given parameters. """ errorEstTest = self.errorEstimator(mus) idxMaxEst = [np.argmax(errorEstTest)] return errorEstTest, idxMaxEst, errorEstTest[idxMaxEst] def _isLastSampleCollinear(self) -> bool: """Check collinearity of last sample.""" if self.collinearityTol <= 0.: return False if self.POD: reff = self.samplingEngine.RPOD[:, -1] else: RROMPyWarning(("Repeated orthogonalization of the samples for " "collinearity check. Consider setting POD to " "True.")) if not hasattr(self, "_PODEngine"): from rrompy.sampling.base.pod_engine import PODEngine self._PODEngine = PODEngine(self.HFEngine) reff = self._PODEngine.generalizedQR(self.samplingEngine.samples, only_R = True, is_state = True)[:, -1] cLevel = np.abs(reff[-1]) / np.linalg.norm(reff) vbMng(self, "MAIN", "Collinearity indicator {:.4e}.".format(cLevel), 5) return cLevel < self.collinearityTol def greedyNextSample(self, muidx:int, plotEst : bool = False)\ -> Tuple[Np1D, int, float, paramVal]: """Compute next greedy snapshot of solution map.""" RROMPyAssert(self._mode, message = "Cannot add greedy sample.") mus = copy(self.muTest[muidx]) self.muTest.pop(muidx) for mu in mus: vbMng(self, "MAIN", ("Adding sample point no. {} at {} to training " "set.").format(self.samplingEngine.nsamples + 1, mu), 2) self.mus.append(mu) self.samplingEngine.nextSample(mu) if self._isLastSampleCollinear(): RROMPyWarning("Collinearity above tolerance detected.") errorEstTest = np.empty(len(self.muTest)) errorEstTest[:] = np.nan return errorEstTest, [-1], np.nan, np.nan errorEstTest, muidx, maxErrorEst = self.getMaxErrorEstimator( self.muTest) if (plotEst and not np.any(np.isnan(errorEstTest)) and not np.any(np.isinf(errorEstTest))): musre = copy(self.muTest.re.data) from matplotlib import pyplot as plt plt.figure() errCP = copy(errorEstTest) while len(musre) > 0: if self.npar == 1: currIdx = np.arange(len(musre)) else: currIdx = np.where(np.isclose(np.sum( np.abs(musre[:, 1 :] - musre[0, 1 :]), 1), 0.))[0] plt.semilogy(musre[currIdx, 0], errCP[currIdx], 'k', linewidth = 1) musre = np.delete(musre, currIdx, 0) errCP = np.delete(errCP, currIdx) plt.semilogy([self.muTest.re(0, 0), self.muTest.re(-1, 0)], [self.greedyTol] * 2, 'r--') plt.semilogy(self.mus.re(0), 2. * self.greedyTol * np.ones(len(self.mus)), '*m') plt.semilogy(self.muTest.re(muidx, 0), maxErrorEst, 'xr') plt.grid() plt.show() plt.close() return errorEstTest, muidx, maxErrorEst, self.muTest[muidx] def _preliminaryTraining(self): """Initialize starting snapshots of solution map.""" RROMPyAssert(self._mode, message = "Cannot start greedy algorithm.") if self.samplingEngine.nsamples > 0: return self.computeScaleFactor() self.resetSamples() self.mus = self.trainSetGenerator.generatePoints(self.S)[ list(range(self.S))] muTestBase = self.sampler.generatePoints(self.nTestPoints) idxPop = pruneSamples(muTestBase ** self.HFEngine.rescalingExp, self.mus ** self.HFEngine.rescalingExp, 1e-10 * self.scaleFactor[0]) muTestBase.pop(idxPop) muTestBase = muTestBase.sort() muLast = copy(self.mus[-1]) self.mus.pop() if len(self.mus) > 0: vbMng(self, "MAIN", ("Adding first {} sample point{} at {} to training " "set.").format(self.S - 1, "" + "s" * (self.S > 2), self.mus), 2) self.samplingEngine.iterSample(self.mus) self.muTest = emptyParameterList() self.muTest.reset((len(muTestBase) + 1, self.mus.shape[1])) self.muTest[: -1] = muTestBase.data self.muTest[-1] = muLast.data def _enrichTestSet(self, nTest:int): """Add extra elements to test set.""" RROMPyAssert(self._mode, message = "Cannot enrich test set.") muTestExtra = self.sampler.generatePoints(2 * nTest) muTotal = copy(self.mus) muTotal.append(self.muTest) idxPop = pruneSamples(muTestExtra ** self.HFEngine.rescalingExp, muTotal ** self.HFEngine.rescalingExp, 1e-10 * self.scaleFactor[0]) muTestExtra.pop(idxPop) muTestNew = np.empty((len(self.muTest) + len(muTestExtra), self.muTest.shape[1]), dtype = np.complex) muTestNew[: len(self.muTest)] = self.muTest.data muTestNew[len(self.muTest) :] = muTestExtra.data self.muTest = checkParameterList(muTestNew, self.npar)[0].sort() vbMng(self, "MAIN", "Enriching test set by {} elements.".format(len(muTestExtra)), 5) def greedy(self, plotEst : bool = False): """Compute greedy snapshots of solution map.""" RROMPyAssert(self._mode, message = "Cannot start greedy algorithm.") if self.samplingEngine.nsamples > 0: return vbMng(self, "INIT", "Starting computation of snapshots.", 2) self._preliminaryTraining() nTest = self.nTestPoints muT0 = copy(self.muTest[-1]) errorEstTest, muidx, maxErrorEst, mu = self.greedyNextSample( [len(self.muTest) - 1], plotEst) if np.any(np.isnan(maxErrorEst)): RROMPyWarning(("Instability in a posteriori estimator. " "Starting preemptive greedy loop termination.")) self.muTest.append(muT0) self.mus.pop(-1) self.samplingEngine.popSample() self.setupApprox() else: vbMng(self, "MAIN", ("Uniform testing error estimate " "{:.4e}.").format(np.max(maxErrorEst)), 2) trainedModelOld = copy(self.trainedModel) while (self.samplingEngine.nsamples < self.maxIter and np.max(maxErrorEst) > self.greedyTol): if (1. - self.refinementRatio) * nTest > len(self.muTest): self._enrichTestSet(nTest) nTest = len(self.muTest) muTestOld, maxErrorEstOld = self.muTest, np.max(maxErrorEst) errorEstTest, muidx, maxErrorEst, mu = self.greedyNextSample( muidx, plotEst) vbMng(self, "MAIN", ("Uniform testing error estimate " "{:.4e}.").format(np.max(maxErrorEst)), 2) if (np.any(np.isnan(maxErrorEst)) or np.any(np.isinf(maxErrorEst)) or maxErrorEstOld < (np.max(maxErrorEst) * self.TOL_INSTABILITY)): RROMPyWarning(("Instability in a posteriori estimator. " "Starting preemptive greedy loop " "termination.")) self.muTest = muTestOld self.mus.pop(-1) self.samplingEngine.popSample() self.trainedModel.data = copy(trainedModelOld.data) break trainedModelOld.data = copy(self.trainedModel.data) if (self.interactive and np.max(maxErrorEst) <= self.greedyTol): vbMng(self, "MAIN", ("Required tolerance {} achieved. Want to decrease " "greedyTol and continue? " "Y/N").format(self.greedyTol), 0, end = "") increasemaxIter = input() if increasemaxIter.upper() == "Y": vbMng(self, "MAIN", "Reducing value of greedyTol...", 0) while np.max(maxErrorEst) <= self._greedyTol: self._greedyTol *= .5 if (self.interactive and self.samplingEngine.nsamples >= self.maxIter): vbMng(self, "MAIN", ("Maximum number of iterations {} reached. Want to " "increase maxIter and continue? " "Y/N").format(self.maxIter), 0, end = "") increasemaxIter = input() if increasemaxIter.upper() == "Y": vbMng(self, "MAIN", "Doubling value of maxIter...", 0) self._maxIter *= 2 vbMng(self, "DEL", ("Done computing snapshots (final snapshot count: " "{}).").format(self.samplingEngine.nsamples), 2) def checkComputedApprox(self) -> bool: """ Check if setup of new approximant is not needed. Returns: True if new setup is not needed. False otherwise. """ return (super().checkComputedApprox() and len(self.mus) == self.trainedModel.data.projMat.shape[1]) def assembleReducedResidualGramian(self, pMat:sampList): """ Build residual gramian of reduced linear system through projections. """ self.initEstimatorNormEngine() if (not hasattr(self.trainedModel.data, "gramian") or self.trainedModel.data.gramian is None): gramian = self.estimatorNormEngine.innerProduct(pMat, pMat) else: Sold = self.trainedModel.data.gramian.shape[0] S = len(self.mus) if Sold > S: gramian = self.trainedModel.data.gramian[: S, : S] else: idxOld = list(range(Sold)) idxNew = list(range(Sold, S)) gramian = np.empty((S, S), dtype = np.complex) gramian[: Sold, : Sold] = self.trainedModel.data.gramian gramian[: Sold, Sold :] = ( self.estimatorNormEngine.innerProduct(pMat(idxNew), pMat(idxOld))) gramian[Sold :, : Sold] = gramian[: Sold, Sold :].T.conj() gramian[Sold :, Sold :] = ( self.estimatorNormEngine.innerProduct(pMat(idxNew), pMat(idxNew))) self.trainedModel.data.gramian = gramian def assembleReducedResidualBlocksbb(self, bs:List[Np1D]): """ Build blocks (of type bb) of reduced linear system through projections. """ self.initEstimatorNormEngine() nbs = len(bs) if (not hasattr(self.trainedModel.data, "resbb") or self.trainedModel.data.resbb is None): resbb = np.empty((nbs, nbs), dtype = np.complex) for i in range(nbs): Mbi = bs[i] resbb[i, i] = self.estimatorNormEngine.innerProduct(Mbi, Mbi) for j in range(i): Mbj = bs[j] resbb[i, j] = self.estimatorNormEngine.innerProduct(Mbj, Mbi) for i in range(nbs): for j in range(i + 1, nbs): resbb[i, j] = resbb[j, i].conj() self.trainedModel.data.resbb = resbb def assembleReducedResidualBlocksAb(self, As:List[Np2D], bs:List[Np1D], pMat:sampList): """ Build blocks (of type Ab) of reduced linear system through projections. """ self.initEstimatorNormEngine() nAs = len(As) nbs = len(bs) S = len(self.mus) if (not hasattr(self.trainedModel.data, "resAb") or self.trainedModel.data.resAb is None): if not isinstance(pMat, (np.ndarray,)): pMat = pMat.data resAb = np.empty((nbs, S, nAs), dtype = np.complex) for j in range(nAs): MAj = dot(As[j], pMat) for i in range(nbs): Mbi = bs[i] resAb[i, :, j] = self.estimatorNormEngine.innerProduct(MAj, Mbi) else: Sold = self.trainedModel.data.resAb.shape[1] if Sold == S: return if Sold > S: resAb = self.trainedModel.data.resAb[:, : S, :] else: if not isinstance(pMat, (np.ndarray,)): pMat = pMat.data resAb = np.empty((nbs, S, nAs), dtype = np.complex) resAb[:, : Sold, :] = self.trainedModel.data.resAb for j in range(nAs): MAj = dot(As[j], pMat[:, Sold :]) for i in range(nbs): Mbi = bs[i] resAb[i, Sold :, j] = ( self.estimatorNormEngine.innerProduct(MAj, Mbi)) self.trainedModel.data.resAb = resAb def assembleReducedResidualBlocksAA(self, As:List[Np2D], pMat:sampList): """ Build blocks (of type AA) of reduced linear system through projections. """ self.initEstimatorNormEngine() nAs = len(As) S = len(self.mus) if (not hasattr(self.trainedModel.data, "resAA") or self.trainedModel.data.resAA is None): if not isinstance(pMat, (np.ndarray,)): pMat = pMat.data resAA = np.empty((S, nAs, S, nAs), dtype = np.complex) for i in range(nAs): MAi = dot(As[i], pMat) resAA[:, i, :, i] = ( self.estimatorNormEngine.innerProduct(MAi, MAi)) for j in range(i): MAj = dot(As[j], pMat) resAA[:, i, :, j] = ( self.estimatorNormEngine.innerProduct(MAj, MAi)) for i in range(nAs): for j in range(i + 1, nAs): resAA[:, i, :, j] = resAA[:, j, :, i].T.conj() else: Sold = self.trainedModel.data.resAA.shape[0] if Sold == S: return if Sold > S: resAA = self.trainedModel.data.resAA[: S, :, : S, :] else: if not isinstance(pMat, (np.ndarray,)): pMat = pMat.data resAA = np.empty((S, nAs, S, nAs), dtype = np.complex) resAA[: Sold, :, : Sold, :] = self.trainedModel.data.resAA for i in range(nAs): MAi = dot(As[i], pMat) resAA[: Sold, i, Sold :, i] = ( self.estimatorNormEngine.innerProduct(MAi[:, Sold :], MAi[:, : Sold])) resAA[Sold :, i, : Sold, i] = resAA[: Sold, i, Sold :, i].T.conj() resAA[Sold :, i, Sold :, i] = ( self.estimatorNormEngine.innerProduct(MAi[:, Sold :], MAi[:, Sold :])) for j in range(i): MAj = dot(As[j], pMat) resAA[: Sold, i, Sold :, j] = ( self.estimatorNormEngine.innerProduct(MAj[:, Sold :], MAi[:, : Sold])) resAA[Sold :, i, : Sold, j] = ( self.estimatorNormEngine.innerProduct(MAj[:, : Sold], MAi[:, Sold :])) resAA[Sold :, i, Sold :, j] = ( self.estimatorNormEngine.innerProduct(MAj[:, Sold :], MAi[:, Sold :])) for i in range(nAs): for j in range(i + 1, nAs): resAA[: Sold, i, Sold :, j] = ( resAA[Sold :, j, : Sold, i].T.conj()) resAA[Sold :, i, : Sold, j] = ( resAA[: Sold, j, Sold :, i].T.conj()) resAA[Sold :, i, Sold :, j] = ( resAA[Sold :, j, Sold :, i].T.conj()) self.trainedModel.data.resAA = resAA def assembleReducedResidualBlocks(self, full : bool = False): """Build affine blocks of affine decomposition of residual.""" self.assembleReducedResidualBlocksbb(self.HFEngine.bs) if full: pMat = self.samplingEngine.samples self.assembleReducedResidualBlocksAb(self.HFEngine.As, self.HFEngine.bs, pMat) self.assembleReducedResidualBlocksAA(self.HFEngine.As, pMat) diff --git a/rrompy/reduction_methods/greedy/rational_interpolant_greedy.py b/rrompy/reduction_methods/greedy/rational_interpolant_greedy.py index 92de7cb..c52dcfd 100644 --- a/rrompy/reduction_methods/greedy/rational_interpolant_greedy.py +++ b/rrompy/reduction_methods/greedy/rational_interpolant_greedy.py @@ -1,497 +1,497 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from copy import deepcopy as copy import numpy as np from .generic_greedy_approximant import (GenericGreedyApproximant, localL2Distance as lL2D) from rrompy.utilities.poly_fitting.polynomial import (polybases, polydomcoeff, PolynomialInterpolator as PI, polyvanderTotal as pvT) from rrompy.utilities.numerical import totalDegreeN, dot from rrompy.utilities.expression import expressionEvaluator from rrompy.reduction_methods.standard import RationalInterpolant from rrompy.utilities.base.types import (Np1D, Tuple, DictAny, HFEng, paramVal, paramList) from rrompy.utilities.base import verbosityManager as vbMng from rrompy.utilities.poly_fitting import customFit from rrompy.utilities.exception_manager import (RROMPyWarning, RROMPyException, RROMPyAssert) from rrompy.parameter import checkParameterList __all__ = ['RationalInterpolantGreedy'] class RationalInterpolantGreedy(GenericGreedyApproximant, RationalInterpolant): """ ROM greedy rational interpolant computation for parametric problems. Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': whether to compute POD of snapshots; defaults to True; - 'S': number of starting training points; - 'sampler': sample point generator; - 'greedyTol': uniform error tolerance for greedy algorithm; defaults to 1e-2; - 'collinearityTol': collinearity tolerance for greedy algorithm; defaults to 0.; - 'interactive': whether to interactively terminate greedy algorithm; defaults to False; - 'maxIter': maximum number of greedy steps; defaults to 1e2; - 'refinementRatio': ratio of test points to be exhausted before test set refinement; defaults to 0.2; - 'nTestPoints': number of test points; defaults to 5e2; - 'trainSetGenerator': training sample points generator; defaults to sampler; - 'polybasis': type of basis for interpolation; defaults to 'MONOMIAL'; - 'errorEstimatorKind': kind of error estimator; available values include 'AFFINE', 'DISCREPANCY', 'INTERPOLATORY', 'EIM_INTERPOLATORY', and 'EIM_DIAGONAL'; defaults to 'AFFINE'; - 'interpRcond': tolerance for interpolation; defaults to None; - 'robustTol': tolerance for robust rational denominator management; defaults to 0. Defaults to empty dict. - force_state(optional): Whether to approximate state. Defaults and must + approx_state(optional): Whether to approximate state. Defaults and must be True. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. mu0: Default parameter. mus: Array of snapshot parameters. approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList. parameterListSoft: Recognized keys of soft approximant parameters: - 'POD': whether to compute POD of snapshots. - 'greedyTol': uniform error tolerance for greedy algorithm; - 'collinearityTol': collinearity tolerance for greedy algorithm; - 'interactive': whether to interactively terminate greedy algorithm; - 'maxIter': maximum number of greedy steps; - 'refinementRatio': ratio of test points to be exhausted before test set refinement; - 'nTestPoints': number of test points; - 'trainSetGenerator': training sample points generator; - 'errorEstimatorKind': kind of error estimator; - 'interpRcond': tolerance for interpolation; - 'robustTol': tolerance for robust rational denominator management. parameterListCritical: Recognized keys of critical approximant parameters: - 'S': total number of samples current approximant relies upon; - 'sampler': sample point generator. - force_state: Whether to approximate state. + approx_state: Whether to approximate state. verbosity: Verbosity level. POD: whether to compute POD of snapshots. S: number of test points. sampler: Sample point generator. greedyTol: uniform error tolerance for greedy algorithm. collinearityTol: Collinearity tolerance for greedy algorithm. interactive: whether to interactively terminate greedy algorithm. maxIter: maximum number of greedy steps. refinementRatio: ratio of training points to be exhausted before training set refinement. nTestPoints: number of starting training points. trainSetGenerator: training sample points generator. robustTol: tolerance for robust rational denominator management. errorEstimatorKind: kind of error estimator. interpRcond: tolerance for interpolation. robustTol: tolerance for robust rational denominator management. muBounds: list of bounds for parameter values. samplingEngine: Sampling engine. estimatorNormEngine: Engine for estimator norm computation. uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as sampleList. lastSolvedHF: Parameter(s) corresponding to last computed high fidelity solution(s) as parameterList. uApproxReduced: Reduced approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApproxReduced: Parameter(s) corresponding to last computed reduced approximate solution(s) as parameterList. uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApprox: Parameter(s) corresponding to last computed approximate solution(s) as parameterList. """ _allowedEstimatorKinds = ["AFFINE", "DISCREPANCY", "INTERPOLATORY", "EIM_INTERPOLATORY", "EIM_DIAGONAL"] def __init__(self, HFEngine:HFEng, mu0 : paramVal = None, - approxParameters : DictAny = {}, force_state : bool = True, + approxParameters : DictAny = {}, approx_state : bool = True, verbosity : int = 10, timestamp : bool = True): - if not force_state: RROMPyWarning("Overriding force_state to True.") + if not approx_state: RROMPyWarning("Overriding approx_state to True.") self._preInit() self._addParametersToList(["errorEstimatorKind"], ["AFFINE"], toBeExcluded = ["M", "N", "polydegreetype", "radialDirectionalWeights", "nNearestNeighbor"]) super().__init__(HFEngine = HFEngine, mu0 = mu0, approxParameters = approxParameters, - force_state = True, verbosity = verbosity, + approx_state = True, verbosity = verbosity, timestamp = timestamp) self._postInit() @property def E(self): """Value of E.""" self._E = self.sampleBatchIdx - 1 return self._E @E.setter def E(self, E): RROMPyWarning(("E is used just to simplify inheritance, and its value " "cannot be changed from that of sampleBatchIdx - 1.")) @property def polydegreetype(self): """Value of polydegreetype.""" return "TOTAL" @polydegreetype.setter def polydegreetype(self, polydegreetype): RROMPyWarning(("polydegreetype is used just to simplify inheritance, " "and its value cannot be changed from 'TOTAL'.")) @property def polybasis(self): """Value of polybasis.""" return self._polybasis @polybasis.setter def polybasis(self, polybasis): try: polybasis = polybasis.upper().strip().replace(" ","") if polybasis not in polybases: raise RROMPyException("Sample type not recognized.") self._polybasis = polybasis except: RROMPyWarning(("Prescribed polybasis not recognized. Overriding " "to 'MONOMIAL'.")) self._polybasis = "MONOMIAL" self._approxParameters["polybasis"] = self.polybasis @property def errorEstimatorKind(self): """Value of errorEstimatorKind.""" return self._errorEstimatorKind @errorEstimatorKind.setter def errorEstimatorKind(self, errorEstimatorKind): errorEstimatorKind = errorEstimatorKind.upper() if errorEstimatorKind not in self._allowedEstimatorKinds: RROMPyWarning(("Error estimator kind not recognized. Overriding " "to 'AFFINE'.")) errorEstimatorKind = "AFFINE" self._errorEstimatorKind = errorEstimatorKind self._approxParameters["errorEstimatorKind"] = self.errorEstimatorKind def errorEstimator(self, mus:Np1D) -> Np1D: """Standard residual-based error estimator.""" if self.errorEstimatorKind == "AFFINE": return super().errorEstimator(mus) setupOK = self.setupApprox() if not setupOK: err = np.empty(len(mus)) err[:] = np.nan return err if self.errorEstimatorKind == "DIAGONAL": return self.errorEstimatorEIM(mus) mus = checkParameterList(mus, self.npar)[0] muCTest = self.trainedModel.centerNormalize(mus) vbMng(self.trainedModel, "INIT", "Evaluating error estimator at mu = {}.".format(mus), 10) verb = self.trainedModel.verbosity self.trainedModel.verbosity = 0 QTest = self.trainedModel.getQVal(mus) if self.errorEstimatorKind == "DISCREPANCY": nAs, nbs = len(self.HFEngine.thAs), len(self.HFEngine.thbs) muTrainEff = self.mus ** self.HFEngine.rescalingExp muTestEff = mus ** self.HFEngine.rescalingExp PTrain = self.trainedModel.getPVal(self.mus).data.T QTrain = self.trainedModel.getQVal(self.mus) PTest = self.trainedModel.getPVal(mus).data radiusAbTrain = np.empty((self.S, nAs * self.S + nbs), dtype = np.complex) radiusA = np.empty((self.S, nAs, len(mus)), dtype = np.complex) radiusb = np.empty((nbs, len(mus)), dtype = np.complex) for j, thA in enumerate(self.HFEngine.thAs): idxs = j * self.S + np.arange(self.S) radiusAbTrain[:, idxs] = expressionEvaluator(thA[0], muTrainEff, (self.S, 1)) * PTrain radiusA[:, j] = PTest * expressionEvaluator(thA[0], muTestEff, (len(mus),)) for j, thb in enumerate(self.HFEngine.thbs): idx = nAs * self.S + j radiusAbTrain[:, idx] = QTrain * expressionEvaluator(thb[0], muTrainEff, (self.S,)) radiusb[j] = QTest * expressionEvaluator(thb[0], muTestEff, (len(mus),)) QRHSNorm2 = self._affineResidualMatricesContraction(radiusb) vanTrain, _, vanTrainIdxs = pvT(self._musUniqueCN, self.N, self.polybasis0, self._derIdxs, self._reorder) interpPQ = customFit(vanTrain[:, vanTrainIdxs], radiusAbTrain, rcond = self.interpRcond) vanTest, _, vanTestIdxs = pvT(muCTest, self.N, self.polybasis0) DradiusAb = vanTest[:, vanTestIdxs].dot(interpPQ) radiusA = (radiusA - DradiusAb[:, : - nbs].reshape(len(mus), -1, self.S).T) radiusb = radiusb - DradiusAb[:, - nbs :].T ff, Lf, LL = self._affineResidualMatricesContraction(radiusb, radiusA) err = np.abs((LL - 2. * np.real(Lf) + ff) / QRHSNorm2) ** .5 else: #if self.errorEstimatorKind == "INTERPOLATORY": muCTrain = self.trainedModel.centerNormalize(self.mus) samplingRatio = np.prod(lL2D(muCTest.data, muCTrain.data), axis = 1) / np.abs(QTest) self.initEstimatorNormEngine() QTest = np.abs(QTest) sampRCP = copy(samplingRatio) idx_muTestSample = np.empty(self.sampleBatchSize, dtype = int) for j in range(self.sampleBatchSize): k = np.argmax(sampRCP) idx_muTestSample[j] = k if j + 1 < self.sampleBatchSize: musZero = self.trainedModel.centerNormalize(mus, mus[k]) sampRCP *= np.linalg.norm(musZero.data, axis = 1) mu_muTestSample = mus[idx_muTestSample] app_muTestSample = self.getApproxReduced(mu_muTestSample) app_muTestSample = dot(self.samplingEngine.samples, app_muTestSample) resmu = self.HFEngine.residual(mu_muTestSample, app_muTestSample, post_c = False) RHSmu = self.HFEngine.residual(mu_muTestSample, None, post_c = False) ressamples = (self.estimatorNormEngine.norm(resmu) / self.estimatorNormEngine.norm(RHSmu)) musT = copy(self.mus) musT.append(mu_muTestSample) musT = self.trainedModel.centerNormalize(musT) musC = self.trainedModel.centerNormalize(mus) resT = np.zeros(len(musT), dtype = np.complex) err = np.zeros(len(mus)) for l in range(len(mu_muTestSample)): resT[len(self.mus) + l] = (ressamples[l] * QTest[idx_muTestSample[l]]) p = PI() wellCond, msg = p.setupByInterpolation(musT, resT, self.M + 1, self.polybasis, self.verbosity >= 15, True, {}, {"rcond": self.interpRcond}) err += np.abs(p(musC)) resT[len(self.mus) + l] = 0. err /= QTest vbMng(self, "MAIN", msg, 15) self.trainedModel.verbosity = verb vbMng(self.trainedModel, "DEL", "Done evaluating error estimator", 10) return err def errorEstimatorEIM(self, mus:Np1D, return_max_idxs : bool = False) -> Np1D: """EIM-like interpolation error estimator.""" setupOK = self.setupApprox() if not setupOK: err = np.empty(len(mus)) err[:] = np.nan return err mus = checkParameterList(mus, self.npar)[0] vbMng(self.trainedModel, "INIT", "Evaluating error estimator at mu = {}.".format(mus), 10) verb = self.trainedModel.verbosity self.trainedModel.verbosity = 0 QTest = self.trainedModel.getQVal(mus) muCTest = self.trainedModel.centerNormalize(mus) muCTrain = self.trainedModel.centerNormalize(self.mus) vanderTest, _, vanderTestR = pvT(muCTest, self.E, self.polybasis) vanderTest = vanderTest[:, vanderTestR] vanderTestNext, _, vanderTestNextR = pvT(muCTest, self.E + 1, self.polybasis) vanderTestNext = vanderTestNext[:, vanderTestNextR[ vanderTest.shape[1] :]] idxsTest = np.arange(vanderTestNext.shape[1]) basis = np.zeros((len(idxsTest), 0), dtype = float) idxMaxEst = [] err = None while len(idxsTest) > 0: vanderTrial, _, vanderTrialR = pvT(muCTrain, self.E, self.polybasis) vanderTrial = vanderTrial[:, vanderTrialR] vanderTrialNext, _, vanderTrialNextR = pvT(muCTrain, self.E + 1, self.polybasis) vanderTrialNext = vanderTrialNext[:, vanderTrialNextR[ vanderTrial.shape[1] :]] vanderTrial = np.hstack((vanderTrial, vanderTrialNext.dot(basis).reshape( len(vanderTrialNext), basis.shape[1]))) valuesTrial = vanderTrialNext[:, idxsTest] vanderTestEff = np.hstack((vanderTest, vanderTestNext.dot(basis).reshape( len(vanderTestNext), basis.shape[1]))) vanderTestNextEff = vanderTestNext[:, idxsTest] coeffTest = np.linalg.solve(vanderTrial, valuesTrial) errTest = np.abs((vanderTestNextEff - vanderTestEff.dot(coeffTest)) / np.expand_dims(QTest, 1)) idxMaxErr = np.unravel_index(np.argmax(errTest), errTest.shape) idxMaxEst += [idxMaxErr[0]] if err is None: err = np.max(errTest, axis = 1) if not return_max_idxs: break muCTrain.append(muCTest[idxMaxErr[0]]) basis = np.pad(basis, [(0, 0), (0, 1)], "constant") basis[idxsTest[idxMaxErr[1]], -1] = 1. idxsTest = np.delete(idxsTest, idxMaxErr[1]) if self.errorEstimatorKind == "EIM_DIAGONAL": self.assembleReducedResidualBlocks(full = False) muTestEff = mus ** self.HFEngine.rescalingExp radiusb = np.empty((len(self.HFEngine.thbs), len(mus)), dtype = np.complex) for j, thb in enumerate(self.HFEngine.thbs): radiusb[j] = expressionEvaluator(thb[0], muTestEff) bresb = self._affineResidualMatricesContraction(radiusb) self.assembleReducedResidualGramian(self.trainedModel.data.projMat) pDom = (polydomcoeff(self.E, self.polybasis) * self.trainedModel.data.P[(-1,) + (0,) * (self.npar - 1)]) LL = pDom.conj().dot(self.trainedModel.data.gramian.dot(pDom)) if not hasattr(self, "Anorm2Approx"): if self.HFEngine.nAs > 1: Ader = self.HFEngine.A(self.mu0, [1] + [0] * (self.npar - 1)) try: Adiag = self.scaleFactor[0] * Ader.diagonal() except: Adiag = self.scaleFactor[0] * np.diagonal(Ader) self.Anorm2Approx = np.mean(np.abs(Adiag) ** 2.) if (np.isclose(self.Anorm2Approx, 0.) or self.HFEngine.nAs <= 1): self.Anorm2Approx = 1 jOpt = np.abs(self.Anorm2Approx * LL / bresb) ** .5 err = jOpt * err else: #if self.errorEstimatorKind == "EIM_INTERPOLATORY": self.initEstimatorNormEngine() mu_muTestSample = mus[idxMaxEst[0]] app_muTestSample = self.getApproxReduced(mu_muTestSample) app_muTestSample = dot(self.samplingEngine.samples, app_muTestSample) resmu = self.HFEngine.residual(mu_muTestSample, app_muTestSample, post_c = False) RHSmu = self.HFEngine.residual(mu_muTestSample, None, post_c = False) jOpt = np.abs(self.estimatorNormEngine.norm(resmu)[0] / err[idxMaxEst[0]] / self.estimatorNormEngine.norm(RHSmu)[0]) err = jOpt * err self.trainedModel.verbosity = verb vbMng(self.trainedModel, "DEL", "Done evaluating error estimator", 10) if return_max_idxs: return err, idxMaxEst return err def getMaxErrorEstimator(self, mus:paramList) -> Tuple[Np1D, int, float]: """ Compute maximum of (and index of maximum of) error estimator over given parameters. """ if self.errorEstimatorKind[: 4] == "EIM_": errorEstTest, idxMaxEst = self.errorEstimatorEIM(mus, True) else: errorEstTest = self.errorEstimator(mus) idxMaxEst = np.empty(self.sampleBatchSize, dtype = int) errCP = copy(errorEstTest) for j in range(self.sampleBatchSize): k = np.argmax(errCP) idxMaxEst[j] = k if j + 1 < self.sampleBatchSize: musZero = self.trainedModel.centerNormalize(mus, mus[k]) errCP *= np.linalg.norm(musZero.data, axis = 1) maxEst = errorEstTest[idxMaxEst] return errorEstTest, idxMaxEst, maxEst def greedyNextSample(self, muidx:int, plotEst : bool = False)\ -> Tuple[Np1D, int, float, paramVal]: """Compute next greedy snapshot of solution map.""" RROMPyAssert(self._mode, message = "Cannot add greedy sample.") self.sampleBatchIdx += 1 self.sampleBatchSize = totalDegreeN(self.npar - 1, self.sampleBatchIdx) return super().greedyNextSample(muidx, plotEst) def _preliminaryTraining(self): """Initialize starting snapshots of solution map.""" RROMPyAssert(self._mode, message = "Cannot start greedy algorithm.") if self.samplingEngine.nsamples > 0: return S = self.S self.sampleBatchIdx, self.sampleBatchSize, self._S = -1, 0, 0 nextBatchSize = 1 while self._S + nextBatchSize <= S: self.sampleBatchIdx += 1 self.sampleBatchSize = nextBatchSize self._S += self.sampleBatchSize nextBatchSize = totalDegreeN(self.npar - 1, self.sampleBatchIdx + 1) super()._preliminaryTraining() def setupApprox(self, plotEst : bool = False): """ Compute rational interpolant. SVD-based robust eigenvalue management. """ if self.checkComputedApprox(): return True RROMPyAssert(self._mode, message = "Cannot setup approximant.") vbMng(self, "INIT", "Setting up {}.".format(self.name()), 5) self.greedy(plotEst) self._S = len(self.mus) self._N, self._M = [self.E] * 2 pMat = self.samplingEngine.samples.data - pMatEff = dot(self.HFEngine.C, pMat) if self.force_state else pMat + pMatEff = dot(self.HFEngine.C, pMat) if self.trainedModel is None: self.trainedModel = self.tModelType() self.trainedModel.verbosity = self.verbosity self.trainedModel.timestamp = self.timestamp datadict = {"mu0": self.mu0, "projMat": pMatEff, "scaleFactor": self.scaleFactor, "rescalingExp": self.HFEngine.rescalingExp} self.trainedModel.data = self.initializeModelData(datadict)[0] else: self.trainedModel = self.trainedModel self.trainedModel.data.projMat = copy(pMatEff) self.trainedModel.data.mus = copy(self.mus) self.trainedModel.data.mus = copy(self.mus) self.catchInstability = True if self.N > 0: try: Q = self._setupDenominator()[0] except RROMPyException as RE: RROMPyWarning(RE) vbMng(self, "DEL", "Done setting up approximant.", 5) return False else: Q = PI() Q.coeffs = np.ones(1, dtype = np.complex) Q.npar = 1 Q.polybasis = self.polybasis self.trainedModel.data.Q = copy(Q) try: self.trainedModel.data.P = copy(self._setupNumerator()) except RROMPyException as RE: RROMPyWarning(RE) vbMng(self, "DEL", "Done setting up approximant.", 5) return False self.trainedModel.data.approxParameters = copy(self.approxParameters) vbMng(self, "DEL", "Done setting up approximant.", 5) return True diff --git a/rrompy/reduction_methods/greedy/reduced_basis_greedy.py b/rrompy/reduction_methods/greedy/reduced_basis_greedy.py index ca2c0bb..82b22ff 100644 --- a/rrompy/reduction_methods/greedy/reduced_basis_greedy.py +++ b/rrompy/reduction_methods/greedy/reduced_basis_greedy.py @@ -1,183 +1,183 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from copy import deepcopy as copy from .generic_greedy_approximant import GenericGreedyApproximant from rrompy.reduction_methods.standard import ReducedBasis from rrompy.utilities.base.types import DictAny, HFEng, paramVal from rrompy.utilities.base import verbosityManager as vbMng from rrompy.utilities.numerical import dot from rrompy.utilities.exception_manager import RROMPyWarning, RROMPyAssert __all__ = ['ReducedBasisGreedy'] class ReducedBasisGreedy(GenericGreedyApproximant, ReducedBasis): """ ROM greedy RB approximant computation for parametric problems. Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': whether to compute POD of snapshots; defaults to True; - 'S': number of starting training points; - 'sampler': sample point generator; - 'greedyTol': uniform error tolerance for greedy algorithm; defaults to 1e-2; - 'collinearityTol': collinearity tolerance for greedy algorithm; defaults to 0.; - 'interactive': whether to interactively terminate greedy algorithm; defaults to False; - 'maxIter': maximum number of greedy steps; defaults to 1e2; - 'refinementRatio': ratio of test points to be exhausted before test set refinement; defaults to 0.2; - 'nTestPoints': number of test points; defaults to 5e2; - 'trainSetGenerator': training sample points generator; defaults to sampler. Defaults to empty dict. - force_state(optional): Whether to approximate state. Defaults and must + approx_state(optional): Whether to approximate state. Defaults and must be True. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. mu0: Default parameter. mus: Array of snapshot parameters. approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList. parameterListSoft: Recognized keys of soft approximant parameters: - 'POD': whether to compute POD of snapshots. - 'greedyTol': uniform error tolerance for greedy algorithm; - 'collinearityTol': collinearity tolerance for greedy algorithm; - 'interactive': whether to interactively terminate greedy algorithm; - 'maxIter': maximum number of greedy steps; - 'refinementRatio': ratio of test points to be exhausted before test set refinement; - 'nTestPoints': number of test points; - 'trainSetGenerator': training sample points generator. parameterListCritical: Recognized keys of critical approximant parameters: - 'S': total number of samples current approximant relies upon; - 'sampler': sample point generator. - force_state: Whether to approximate state. + approx_state: Whether to approximate state. verbosity: Verbosity level. POD: whether to compute POD of snapshots. S: number of test points. sampler: Sample point generator. greedyTol: uniform error tolerance for greedy algorithm. collinearityTol: Collinearity tolerance for greedy algorithm. interactive: whether to interactively terminate greedy algorithm. maxIter: maximum number of greedy steps. refinementRatio: ratio of training points to be exhausted before training set refinement. nTestPoints: number of starting training points. trainSetGenerator: training sample points generator. muBounds: list of bounds for parameter values. samplingEngine: Sampling engine. estimatorNormEngine: Engine for estimator norm computation. uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as sampleList. lastSolvedHF: Parameter(s) corresponding to last computed high fidelity solution(s) as parameterList. uApproxReduced: Reduced approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApproxReduced: Parameter(s) corresponding to last computed reduced approximate solution(s) as parameterList. uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApprox: Parameter(s) corresponding to last computed approximate solution(s) as parameterList. As: List of sparse matrices (in CSC format) representing coefficients of linear system matrix. bs: List of numpy vectors representing coefficients of linear system RHS. ARBs: List of sparse matrices (in CSC format) representing coefficients of compressed linear system matrix. bRBs: List of numpy vectors representing coefficients of compressed linear system RHS. """ def __init__(self, HFEngine:HFEng, mu0 : paramVal = None, - approxParameters : DictAny = {}, force_state : bool = True, + approxParameters : DictAny = {}, approx_state : bool = True, verbosity : int = 10, timestamp : bool = True): - if not force_state: RROMPyWarning("Overriding force_state to True.") + if not approx_state: RROMPyWarning("Overriding approx_state to True.") self._preInit() self._addParametersToList([], [], toBeExcluded = ["R", "PODTolerance"]) super().__init__(HFEngine = HFEngine, mu0 = mu0, approxParameters = approxParameters, - force_state = True, verbosity = verbosity, + approx_state = True, verbosity = verbosity, timestamp = timestamp) self._postInit() @property def R(self): """Value of R.""" self._R = self._S return self._R @R.setter def R(self, R): RROMPyWarning(("R is used just to simplify inheritance, and its value " "cannot be changed from that of S.")) @property def PODTolerance(self): """Value of PODTolerance.""" self._PODTolerance = -1 return self._PODTolerance @PODTolerance.setter def PODTolerance(self, PODTolerance): RROMPyWarning(("PODTolerance is used just to simplify inheritance, " "and its value cannot be changed from -1.")) def setupApprox(self, plotEst : bool = False): """Compute RB projection matrix.""" if self.checkComputedApprox(): return RROMPyAssert(self._mode, message = "Cannot setup approximant.") vbMng(self, "INIT", "Setting up {}.". format(self.name()), 5) self.greedy(plotEst) vbMng(self, "INIT", "Computing projection matrix.", 7) pMat = self.samplingEngine.samples.data - pMatEff = dot(self.HFEngine.C, pMat) if self.force_state else pMat + pMatEff = dot(self.HFEngine.C, pMat) if self.trainedModel is None: self.trainedModel = self.tModelType() self.trainedModel.verbosity = self.verbosity self.trainedModel.timestamp = self.timestamp datadict = {"mu0": self.mu0, "projMat": pMatEff, "scaleFactor": self.scaleFactor, "rescalingExp": self.HFEngine.rescalingExp} data = self.initializeModelData(datadict)[0] ARBs, bRBs = self.assembleReducedSystem(pMat) data.affinePoly = self.HFEngine.affinePoly data.thAs, data.thbs = self.HFEngine.thAs, self.HFEngine.thbs self.trainedModel.data = data else: self.trainedModel = self.trainedModel Sold = self.trainedModel.data.projMat.shape[1] ARBs, bRBs = self.assembleReducedSystem(pMat[:, Sold :], pMat[:, : Sold]) self.trainedModel.data.projMat = copy(pMatEff) self.trainedModel.data.mus = copy(self.mus) self.trainedModel.data.ARBs = ARBs self.trainedModel.data.bRBs = bRBs vbMng(self, "DEL", "Done computing projection matrix.", 7) self.trainedModel.data.approxParameters = copy(self.approxParameters) vbMng(self, "DEL", "Done setting up approximant.", 5) diff --git a/rrompy/reduction_methods/pivoted/generic_pivoted_approximant.py b/rrompy/reduction_methods/pivoted/generic_pivoted_approximant.py index 0e4811a..06c56f4 100644 --- a/rrompy/reduction_methods/pivoted/generic_pivoted_approximant.py +++ b/rrompy/reduction_methods/pivoted/generic_pivoted_approximant.py @@ -1,538 +1,539 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import numpy as np from copy import deepcopy as copy from rrompy.reduction_methods.base.generic_approximant import ( GenericApproximant) from rrompy.utilities.poly_fitting.polynomial import (polybases as ppb, PolynomialInterpolator as PI) from rrompy.utilities.poly_fitting.radial_basis import (polybases as rbpb, RadialBasisInterpolator as RBI) from rrompy.utilities.poly_fitting.moving_least_squares import ( polybases as mlspb, MovingLeastSquaresInterpolator as MLSI) from rrompy.sampling.pivoted import (SamplingEnginePivoted, SamplingEnginePivotedPOD) from rrompy.utilities.base.types import ListAny, DictAny, HFEng, paramVal from rrompy.utilities.base import verbosityManager as vbMng from rrompy.utilities.numerical import (fullDegreeN, totalDegreeN, nextDerivativeIndices) from rrompy.utilities.exception_manager import (RROMPyException, RROMPyAssert, RROMPyWarning) from rrompy.parameter import emptyParameterList __all__ = ['GenericPivotedApproximant'] class GenericPivotedApproximant(GenericApproximant): """ ROM pivoted approximant (with pole matching) computation for parametric problems (ABSTRACT). Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. directionPivot(optional): Pivot components. Defaults to [0]. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': whether to compute POD of snapshots; defaults to True; - 'matchingWeight': weight for pole matching optimization; defaults to 1; - 'cutOffTolerance': tolerance for ignoring parasitic poles; defaults to np.inf; - 'cutOffType': rule for tolerance computation for parasitic poles; defaults to 'MAGNITUDE'; - 'S': total number of pivot samples current approximant relies upon; - 'samplerPivot': pivot sample point generator; - 'SMarginal': total number of marginal samples current approximant relies upon; - 'samplerMarginal': marginal sample point generator; - 'polybasisMarginal': type of polynomial basis for marginal interpolation; allowed values include 'MONOMIAL', 'CHEBYSHEV' and 'LEGENDRE'; defaults to 'MONOMIAL'; - 'MMarginal': degree of marginal interpolant; defaults to 0; - 'polydegreetypeMarginal': type of polynomial degree for marginal; defaults to 'TOTAL'; - 'radialDirectionalWeightsMarginal': radial basis weights for marginal interpolant; defaults to 0, i.e. identity; - 'nNearestNeighborMarginal': number of marginal nearest neighbors considered if polybasisMarginal allows; defaults to -1; - 'interpRcondMarginal': tolerance for marginal interpolation; defaults to None. Defaults to empty dict. - force_state(optional): Whether to approximate state. Defaults to False. + approx_state(optional): Whether to approximate state. Defaults to + False. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. mu0: Default parameter. directionPivot: Pivot components. mus: Array of snapshot parameters. musPivot: Array of pivot snapshot parameters. musMarginal: Array of marginal snapshot parameters. approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList. parameterListSoft: Recognized keys of soft approximant parameters: - 'POD': whether to compute POD of snapshots; - 'matchingWeight': weight for pole matching optimization; - 'cutOffTolerance': tolerance for ignoring parasitic poles; - 'cutOffType': rule for tolerance computation for parasitic poles; - 'polybasisMarginal': type of polynomial basis for marginal interpolation; - 'MMarginal': degree of marginal interpolant; - 'polydegreetypeMarginal': type of polynomial degree for marginal; - 'radialDirectionalWeightsMarginal': radial basis weights for marginal interpolant; - 'nNearestNeighborMarginal': number of marginal nearest neighbors considered if polybasisMarginal allows; - 'interpRcondMarginal': tolerance for marginal interpolation. parameterListCritical: Recognized keys of critical approximant parameters: - 'S': total number of pivot samples current approximant relies upon; - 'samplerPivot': pivot sample point generator; - 'SMarginal': total number of marginal samples current approximant relies upon; - 'samplerMarginal': marginal sample point generator. - force_state: Whether to approximate state. + approx_state: Whether to approximate state. verbosity: Verbosity level. POD: Whether to compute POD of snapshots. matchingWeight: Weight for pole matching optimization. cutOffTolerance: Tolerance for ignoring parasitic poles. cutOffType: Rule for tolerance computation for parasitic poles. S: Total number of pivot samples current approximant relies upon. samplerPivot: Pivot sample point generator. SMarginal: Total number of marginal samples current approximant relies upon. samplerMarginal: Marginal sample point generator. polybasisMarginal: Type of polynomial basis for marginal interpolation. MMarginal: Degree of marginal interpolant. polydegreetypeMarginal: Type of polynomial degree for marginal. radialDirectionalWeightsMarginal: Radial basis weights for marginal interpolant. nNearestNeighborMarginal: Number of marginal nearest neighbors considered if polybasisMarginal allows. interpRcondMarginal: Tolerance for marginal interpolation. muBoundsPivot: list of bounds for pivot parameter values. muBoundsMarginal: list of bounds for marginal parameter values. samplingEngine: Sampling engine. uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as sampleList. lastSolvedHF: Parameter(s) corresponding to last computed high fidelity solution(s) as parameterList. uApproxReduced: Reduced approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApproxReduced: Parameter(s) corresponding to last computed reduced approximate solution(s) as parameterList. uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApprox: Parameter(s) corresponding to last computed approximate solution(s) as parameterList. """ def __init__(self, HFEngine:HFEng, mu0 : paramVal = None, directionPivot : ListAny = [0], - approxParameters : DictAny = {}, force_state : bool = False, + approxParameters : DictAny = {}, approx_state : bool = False, verbosity : int = 10, timestamp : bool = True): self._preInit() if len(directionPivot) > 1: raise RROMPyException(("Exactly 1 pivot parameter allowed in pole " "matching.")) from rrompy.parameter.parameter_sampling import QuadratureSampler as QS QSBase = QS([[0], [1]], "UNIFORM") self._addParametersToList(["matchingWeight", "cutOffTolerance", "cutOffType", "polybasisMarginal", "MMarginal", "polydegreetypeMarginal", "radialDirectionalWeightsMarginal", "nNearestNeighborMarginal", "interpRcondMarginal"], [1, np.inf, "MAGNITUDE", "MONOMIAL", 0, "TOTAL", 1, -1, -1], ["samplerPivot", "SMarginal", "samplerMarginal"], [QSBase, [1], QSBase]) del QS self._directionPivot = directionPivot super().__init__(HFEngine = HFEngine, mu0 = mu0, approxParameters = approxParameters, - force_state = force_state, verbosity = verbosity, + approx_state = approx_state, verbosity = verbosity, timestamp = timestamp) self._postInit() def setupSampling(self): """Setup sampling engine.""" RROMPyAssert(self._mode, message = "Cannot setup sampling engine.") if not hasattr(self, "_POD") or self._POD is None: return if self.POD: SamplingEngine = SamplingEnginePivotedPOD else: SamplingEngine = SamplingEnginePivoted self.samplingEngine = SamplingEngine(self.HFEngine, self.directionPivot, - force_state = self.force_state, + sample_state = self.approx_state, verbosity = self.verbosity) @property def matchingWeight(self): """Value of matchingWeight.""" return self._matchingWeight @matchingWeight.setter def matchingWeight(self, matchingWeight): self._matchingWeight = matchingWeight self._approxParameters["matchingWeight"] = self.matchingWeight @property def cutOffTolerance(self): """Value of cutOffTolerance.""" return self._cutOffTolerance @cutOffTolerance.setter def cutOffTolerance(self, cutOffTolerance): self._cutOffTolerance = cutOffTolerance self._approxParameters["cutOffTolerance"] = self.cutOffTolerance @property def cutOffType(self): """Value of cutOffType.""" return self._cutOffType @cutOffType.setter def cutOffType(self, cutOffType): try: cutOffType = cutOffType.upper().strip().replace(" ","") if cutOffType not in ["MAGNITUDE", "POTENTIAL"]: raise RROMPyException("Prescribed cutOffType not recognized.") self._cutOffType = cutOffType except: RROMPyWarning(("Prescribed cutOffType not recognized. Overriding " "to 'MAGNITUDE'.")) self._cutOffType = "MAGNITUDE" self._approxParameters["cutOffType"] = self.cutOffType @property def SMarginal(self): """Value of SMarginal.""" return self._SMarginal @SMarginal.setter def SMarginal(self, SMarginal): if SMarginal <= 0: raise RROMPyException("SMarginal must be positive.") if hasattr(self, "_SMarginal") and self._SMarginal is not None: Sold = self.SMarginal else: Sold = -1 self._SMarginal = SMarginal self._approxParameters["SMarginal"] = self.SMarginal if Sold != self.SMarginal: self.resetSamples() @property def polybasisMarginal(self): """Value of polybasisMarginal.""" return self._polybasisMarginal @polybasisMarginal.setter def polybasisMarginal(self, polybasisMarginal): try: polybasisMarginal = polybasisMarginal.upper().strip().replace(" ", "") if polybasisMarginal not in ppb + rbpb + mlspb: raise RROMPyException( "Prescribed marginal polybasis not recognized.") self._polybasisMarginal = polybasisMarginal except: RROMPyWarning(("Prescribed marginal polybasis not recognized. " "Overriding to 'MONOMIAL'.")) self._polybasisMarginal = "MONOMIAL" self._approxParameters["polybasisMarginal"] = self.polybasisMarginal @property def MMarginal(self): """Value of MMarginal.""" return self._MMarginal @MMarginal.setter def MMarginal(self, MMarginal): if MMarginal < 0: raise RROMPyException("MMarginal must be non-negative.") self._MMarginal = MMarginal self._approxParameters["MMarginal"] = self.MMarginal @property def polydegreetypeMarginal(self): """Value of polydegreetypeMarginal.""" return self._polydegreetypeMarginal @polydegreetypeMarginal.setter def polydegreetypeMarginal(self, polydegreetypeM): try: polydegreetypeM = polydegreetypeM.upper().strip().replace(" ","") if polydegreetypeM not in ["TOTAL", "FULL"]: raise RROMPyException(("Prescribed polydegreetypeMarginal not " "recognized.")) self._polydegreetypeMarginal = polydegreetypeM except: RROMPyWarning(("Prescribed polydegreetypeMarginal not recognized. " "Overriding to 'TOTAL'.")) self._polydegreetypeMarginal = "TOTAL" self._approxParameters["polydegreetypeMarginal"] = ( self.polydegreetypeMarginal) @property def radialDirectionalWeightsMarginal(self): """Value of radialDirectionalWeightsMarginal.""" return self._radialDirectionalWeightsMarginal @radialDirectionalWeightsMarginal.setter def radialDirectionalWeightsMarginal(self, radialDirWeightsMarginal): self._radialDirectionalWeightsMarginal = radialDirWeightsMarginal self._approxParameters["radialDirectionalWeightsMarginal"] = ( self.radialDirectionalWeightsMarginal) @property def nNearestNeighborMarginal(self): """Value of nNearestNeighborMarginal.""" return self._nNearestNeighborMarginal @nNearestNeighborMarginal.setter def nNearestNeighborMarginal(self, nNearestNeighborMarginal): self._nNearestNeighborMarginal = nNearestNeighborMarginal self._approxParameters["nNearestNeighborMarginal"] = ( self.nNearestNeighborMarginal) @property def interpRcondMarginal(self): """Value of interpRcondMarginal.""" return self._interpRcondMarginal @interpRcondMarginal.setter def interpRcondMarginal(self, interpRcondMarginal): self._interpRcondMarginal = interpRcondMarginal self._approxParameters["interpRcondMarginal"] = ( self.interpRcondMarginal) @property def directionPivot(self): """Value of directionPivot. Its assignment may reset snapshots.""" return self._directionPivot @directionPivot.setter def directionPivot(self, directionPivot): if hasattr(self, '_directionPivot'): directionPivotOld = copy(self.directionPivot) else: directionPivotOld = None if (directionPivotOld is None or len(directionPivot) != len(directionPivotOld) or not directionPivot == directionPivotOld): self.resetSamples() self._directionPivot = directionPivot @property def directionMarginal(self): return [x for x in range(self.HFEngine.npar) \ if x not in self.directionPivot] @property def nparPivot(self): return len(self.directionPivot) @property def nparMarginal(self): return self.npar - self.nparPivot @property def rescalingExpPivot(self): return [self.HFEngine.rescalingExp[x] for x in self.directionPivot] @property def rescalingExpMarginal(self): return [self.HFEngine.rescalingExp[x] for x in self.directionMarginal] @property def muBoundsPivot(self): """Value of muBoundsPivot.""" return self.samplerPivot.lims @property def muBoundsMarginal(self): """Value of muBoundsMarginal.""" return self.samplerMarginal.lims @property def samplerPivot(self): """Value of samplerPivot.""" return self._samplerPivot @samplerPivot.setter def samplerPivot(self, samplerPivot): if 'generatePoints' not in dir(samplerPivot): raise RROMPyException("Pivot sampler type not recognized.") if hasattr(self, '_samplerPivot') and self._samplerPivot is not None: samplerOld = self.samplerPivot self._samplerPivot = samplerPivot self._approxParameters["samplerPivot"] = self.samplerPivot.__str__() if not 'samplerOld' in locals() or samplerOld != self.samplerPivot: self.resetSamples() @property def samplerMarginal(self): """Value of samplerMarginal.""" return self._samplerMarginal @samplerMarginal.setter def samplerMarginal(self, samplerMarginal): if 'generatePoints' not in dir(samplerMarginal): raise RROMPyException("Marginal sampler type not recognized.") if (hasattr(self, '_samplerMarginal') and self._samplerMarginal is not None): samplerOld = self.samplerMarginal self._samplerMarginal = samplerMarginal self._approxParameters["samplerMarginal"] = ( self.samplerMarginal.__str__()) if not 'samplerOld' in locals() or samplerOld != self.samplerMarginal: self.resetSamples() def resetSamples(self): """Reset samples.""" super().resetSamples() self._musMUniqueCN = None self._derMIdxs = None self._reorderM = None def setSamples(self, samplingEngine): """Copy samplingEngine and samples.""" super().setSamples(samplingEngine) self.mus = copy(self.samplingEngine[0].mus) for sEj in self.samplingEngine[1:]: self.mus.append(sEj.mus) def computeSnapshots(self): """Compute snapshots of solution map.""" RROMPyAssert(self._mode, message = "Cannot start snapshot computation.") if self.samplingEngine.nsamplesTot != self.S * self.SMarginal: self.computeScaleFactor() self.resetSamples() vbMng(self, "INIT", "Starting computation of snapshots.", 5) self.HFEngine.buildA() self.HFEngine.buildb() self.musPivot = self.samplerPivot.generatePoints(self.S) self.musMarginal = self.samplerMarginal.generatePoints( self.SMarginal) self.mus = emptyParameterList() self.mus.reset((self.S * self.SMarginal, self.HFEngine.npar)) self.samplingEngine.resetHistory(self.SMarginal) for j, muMarg in enumerate(self.musMarginal): for k in range(j * self.S, (j + 1) * self.S): self.mus.data[k, self.directionPivot] = ( self.musPivot[k - j * self.S].data) self.mus.data[k, self.directionMarginal] = muMarg.data self.samplingEngine.iterSample(self.musPivot, self.musMarginal) if self.POD: self.samplingEngine.coalesceSamples(self.interpRcondMarginal) else: self.samplingEngine.coalesceSamples() vbMng(self, "DEL", "Done computing snapshots.", 5) def _setupMarginalInterpolationIndices(self): """Setup parameters for polyvander.""" RROMPyAssert(self._mode, message = "Cannot setup interpolation indices.") if (self._musMUniqueCN is None or len(self._reorderM) != len(self.musMarginal)): self._musMUniqueCN, musMIdxsTo, musMIdxs, musMCount = ( self.trainedModel.centerNormalizeMarginal(self.musMarginal)\ .unique(return_index = True, return_inverse = True, return_counts = True)) self._musMUnique = self.musMarginal[musMIdxsTo] self._derMIdxs = [None] * len(self._musMUniqueCN) self._reorderM = np.empty(len(musMIdxs), dtype = int) filled = 0 for j, cnt in enumerate(musMCount): self._derMIdxs[j] = nextDerivativeIndices([], self.nparMarginal, cnt) jIdx = np.nonzero(musMIdxs == j)[0] self._reorderM[jIdx] = np.arange(filled, filled + cnt) filled += cnt def _setupMarginalInterp(self): """Compute marginal interpolator.""" RROMPyAssert(self._mode, message = "Cannot setup numerator.") vbMng(self, "INIT", "Starting computation of marginal interpolator.", 7) self._setupMarginalInterpolationIndices() if self.polydegreetypeMarginal == "TOTAL": cfun = totalDegreeN else: cfun = fullDegreeN MM = copy(self.MMarginal) while len(self.musMarginal) < cfun(MM, self.nparMarginal): MM -= 1 if MM < self.MMarginal: RROMPyWarning( ("MMarginal too large compared to SMarginal. " "Reducing MMarginal by {}").format(self.MMarginal - MM)) self.MMarginal = MM mI = [] for j in range(len(self.musMarginal)): canonicalj = 1. * (np.arange(len(self.musMarginal)) == j) self._MMarginal = MM while self.MMarginal >= 0: if self.polybasisMarginal in ppb: p = PI() wellCond, msg = p.setupByInterpolation( self._musMUniqueCN, canonicalj, self.MMarginal, self.polybasisMarginal, self.verbosity >= 5, self.polydegreetypeMarginal == "TOTAL", {"derIdxs": self._derMIdxs, "reorder": self._reorderM, "scl": np.power(self.scaleFactorMarginal, -1.)}, {"rcond": self.interpRcondMarginal}) elif self.polybasisMarginal in rbpb: p = RBI() wellCond, msg = p.setupByInterpolation( self._musMUniqueCN, canonicalj, self.MMarginal, self.polybasisMarginal, self.radialDirectionalWeightsMarginal, self.verbosity >= 5, self.polydegreetypeMarginal == "TOTAL", {"derIdxs": self._derMIdxs, "reorder": self._reorderM, "scl": np.power(self.scaleFactorMarginal, -1.), "nNearestNeighbor" : self.nNearestNeighborMarginal}, {"rcond": self.interpRcondMarginal}) else:# if self.polybasisMarginal in mlspb: p = MLSI() wellCond, msg = p.setupByInterpolation( self._musMUniqueCN, canonicalj, self.MMarginal, self.polybasisMarginal, self.radialDirectionalWeightsMarginal, self.verbosity >= 5, self.polydegreetypeMarginal == "TOTAL", {"derIdxs": self._derMIdxs, "reorder": self._reorderM, "scl": np.power(self.scaleFactorMarginal, -1.), "nNearestNeighbor" : self.nNearestNeighborMarginal}) vbMng(self, "MAIN", msg, 5) if wellCond: break RROMPyWarning(("Polyfit is poorly conditioned. Reducing " "MMarginal by 1.")) self.MMarginal = self.MMarginal - 1 mI = mI + [copy(p)] vbMng(self, "DEL", "Done computing marginal interpolator.", 7) return mI def computeScaleFactor(self): """Compute parameter rescaling factor.""" RROMPyAssert(self._mode, message = "Cannot compute rescaling factor.") self.scaleFactorPivot = .5 * np.abs( self.muBoundsPivot[0] ** self.rescalingExpPivot - self.muBoundsPivot[1] ** self.rescalingExpPivot) self.scaleFactorMarginal = .5 * np.abs( self.muBoundsMarginal[0] ** self.rescalingExpMarginal - self.muBoundsMarginal[1] ** self.rescalingExpMarginal) self.scaleFactor = np.empty(self.npar) self.scaleFactor[self.directionPivot] = self.scaleFactorPivot self.scaleFactor[self.directionMarginal] = self.scaleFactorMarginal diff --git a/rrompy/reduction_methods/pivoted/rational_interpolant_pivoted.py b/rrompy/reduction_methods/pivoted/rational_interpolant_pivoted.py index 40aad79..9b3c244 100644 --- a/rrompy/reduction_methods/pivoted/rational_interpolant_pivoted.py +++ b/rrompy/reduction_methods/pivoted/rational_interpolant_pivoted.py @@ -1,643 +1,644 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from copy import deepcopy as copy import numpy as np from rrompy.reduction_methods.base import checkRobustTolerance from .generic_pivoted_approximant import GenericPivotedApproximant from rrompy.reduction_methods.standard.rational_interpolant import ( RationalInterpolant as RI) from rrompy.utilities.poly_fitting.polynomial import ( polybases as ppb, polyfitname, polyvander as pvP, polyvanderTotal as pvTP, PolynomialInterpolator as PI) from rrompy.utilities.poly_fitting.radial_basis import (polybases as rbpb, RadialBasisInterpolator as RBI) from rrompy.utilities.poly_fitting.moving_least_squares import ( polybases as mlspb, MovingLeastSquaresInterpolator as MLSI) from rrompy.utilities.base.types import (Np1D, Np2D, HFEng, DictAny, Tuple, List, ListAny, paramVal) from rrompy.utilities.base import verbosityManager as vbMng, freepar as fp from rrompy.utilities.numerical import (multifactorial, customPInv, dot, fullDegreeN, totalDegreeN, degreeTotalToFull, fullDegreeMaxMask, totalDegreeMaxMask, nextDerivativeIndices, hashDerivativeToIdx as hashD, hashIdxToDerivative as hashI) from rrompy.utilities.exception_manager import (RROMPyException, RROMPyAssert, RROMPyWarning) from rrompy.parameter import checkParameter __all__ = ['RationalInterpolantPivoted'] class RationalInterpolantPivoted(GenericPivotedApproximant): """ ROM pivoted rational interpolant (with pole matching) computation for parametric problems. Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. directionPivot(optional): Pivot components. Defaults to [0]. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': whether to compute POD of snapshots; defaults to True; - 'matchingWeight': weight for pole matching optimization; defaults to 1; - 'cutOffTolerance': tolerance for ignoring parasitic poles; defaults to np.inf; - 'cutOffType': rule for tolerance computation for parasitic poles; defaults to 'MAGNITUDE'; - 'S': total number of pivot samples current approximant relies upon; - 'samplerPivot': pivot sample point generator; - 'SMarginal': total number of marginal samples current approximant relies upon; - 'samplerMarginal': marginal sample point generator; - 'polybasisPivot': type of polynomial basis for pivot interpolation; defaults to 'MONOMIAL'; - 'polybasisMarginal': type of polynomial basis for marginal interpolation; allowed values include 'MONOMIAL', 'CHEBYSHEV' and 'LEGENDRE'; defaults to 'MONOMIAL'; - 'M': degree of rational interpolant numerator; defaults to 0; - 'N': degree of rational interpolant denominator; defaults to 0; - 'polydegreetype': type of polynomial degree; defaults to 'TOTAL'; - 'MMarginal': degree of marginal interpolant; defaults to 0; - 'polydegreetypeMarginal': type of polynomial degree for marginal; defaults to 'TOTAL'; - 'radialDirectionalWeightsPivot': radial basis weights for pivot numerator; defaults to 0, i.e. identity; - 'radialDirectionalWeightsMarginal': radial basis weights for marginal interpolant; defaults to 0, i.e. identity; - 'nNearestNeighborPivot': number of pivot nearest neighbors considered if polybasisPivot allows; defaults to -1; - 'nNearestNeighborMarginal': number of marginal nearest neighbors considered if polybasisMarginal allows; defaults to -1; - 'interpRcondPivot': tolerance for pivot interpolation; defaults to None; - 'interpRcondMarginal': tolerance for marginal interpolation; defaults to None; - 'robustTol': tolerance for robust rational denominator management; defaults to 0. Defaults to empty dict. - force_state(optional): Whether to approximate state. Defaults to False. + approx_state(optional): Whether to approximate state. Defaults to + False. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. mu0: Default parameter. directionPivot: Pivot components. mus: Array of snapshot parameters. musPivot: Array of pivot snapshot parameters. musMarginal: Array of marginal snapshot parameters. approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList. parameterListSoft: Recognized keys of soft approximant parameters: - 'POD': whether to compute POD of snapshots; - 'matchingWeight': weight for pole matching optimization; - 'cutOffTolerance': tolerance for ignoring parasitic poles; - 'cutOffType': rule for tolerance computation for parasitic poles; - 'polybasisPivot': type of polynomial basis for pivot interpolation; - 'polybasisMarginal': type of polynomial basis for marginal interpolation; - 'M': degree of rational interpolant numerator; - 'N': degree of rational interpolant denominator; - 'polydegreetype': type of polynomial degree; - 'MMarginal': degree of marginal interpolant; - 'polydegreetypeMarginal': type of polynomial degree for marginal; - 'radialDirectionalWeightsPivot': radial basis weights for pivot numerator; - 'radialDirectionalWeightsMarginal': radial basis weights for marginal interpolant; - 'nNearestNeighborPivot': number of pivot nearest neighbors considered if polybasisPivot allows; - 'nNearestNeighborMarginal': number of marginal nearest neighbors considered if polybasisMarginal allows; - 'interpRcondPivot': tolerance for pivot interpolation; - 'interpRcondMarginal': tolerance for marginal interpolation; - 'robustTol': tolerance for robust rational denominator management. parameterListCritical: Recognized keys of critical approximant parameters: - 'S': total number of pivot samples current approximant relies upon; - 'samplerPivot': pivot sample point generator; - 'SMarginal': total number of marginal samples current approximant relies upon; - 'samplerMarginal': marginal sample point generator. - force_state: Whether to approximate state. + approx_state: Whether to approximate state. verbosity: Verbosity level. POD: Whether to compute POD of snapshots. matchingWeight: Weight for pole matching optimization. cutOffTolerance: Tolerance for ignoring parasitic poles. cutOffType: Rule for tolerance computation for parasitic poles. S: Total number of pivot samples current approximant relies upon. sampler: Pivot sample point generator. polybasisPivot: Type of polynomial basis for pivot interpolation. polybasisMarginal: Type of polynomial basis for marginal interpolation. M: Numerator degree of approximant. N: Denominator degree of approximant. polydegreetype: Type of polynomial degree. MMarginal: Degree of marginal interpolant. polydegreetypeMarginal: Type of polynomial degree for marginal. radialDirectionalWeightsPivot: Radial basis weights for pivot numerator. radialDirectionalWeightsMarginal: Radial basis weights for marginal interpolant. nNearestNeighborPivot: Number of pivot nearest neighbors considered if polybasisPivot allows. nNearestNeighborMarginal: Number of marginal nearest neighbors considered if polybasisMarginal allows. interpRcondPivot: Tolerance for pivot interpolation. interpRcondMarginal: Tolerance for marginal interpolation. robustTol: Tolerance for robust rational denominator management. muBoundsPivot: list of bounds for pivot parameter values. muBoundsMarginal: list of bounds for marginal parameter values. samplingEngine: Sampling engine. uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as sampleList. lastSolvedHF: Parameter(s) corresponding to last computed high fidelity solution(s) as parameterList. uApproxReduced: Reduced approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApproxReduced: Parameter(s) corresponding to last computed reduced approximate solution(s) as parameterList. uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApprox: Parameter(s) corresponding to last computed approximate solution(s) as parameterList. Q: Numpy 1D vector containing complex coefficients of approximant denominator. P: Numpy 2D vector whose columns are FE dofs of coefficients of approximant numerator. """ def __init__(self, HFEngine:HFEng, mu0 : paramVal = None, directionPivot : ListAny = [0], - approxParameters : DictAny = {}, force_state : bool = False, + approxParameters : DictAny = {}, approx_state : bool = False, verbosity : int = 10, timestamp : bool = True): self._preInit() self._addParametersToList(["polybasisPivot", "M", "N", "polydegreetype", "radialDirectionalWeightsPivot", "nNearestNeighborPivot", "interpRcondPivot", "robustTol"], ["MONOMIAL", 0, 0, "TOTAL", 1, -1, -1, 0]) super().__init__(HFEngine = HFEngine, mu0 = mu0, directionPivot = directionPivot, approxParameters = approxParameters, - force_state = force_state, verbosity = verbosity, + approx_state = approx_state, verbosity = verbosity, timestamp = timestamp) self._postInit() @property def tModelType(self): from rrompy.reduction_methods.trained_model import \ TrainedModelPivotedRational return TrainedModelPivotedRational def initializeModelData(self, datadict): from rrompy.reduction_methods.trained_model import \ TrainedModelPivotedData return (TrainedModelPivotedData(datadict["mu0"], datadict.pop("projMat"), datadict["scaleFactor"], datadict.pop("rescalingExp"), datadict["directionPivot"]), ["mu0", "scaleFactor", "directionPivot", "mus"]) @property def polybasisPivot(self): """Value of polybasisPivot.""" return self._polybasisPivot @polybasisPivot.setter def polybasisPivot(self, polybasisPivot): try: polybasisPivot = polybasisPivot.upper().strip().replace(" ","") if polybasisPivot not in ppb + rbpb + mlspb: raise RROMPyException( "Prescribed pivot polybasis not recognized.") self._polybasisPivot = polybasisPivot except: RROMPyWarning(("Prescribed pivot polybasis not recognized. " "Overriding to 'MONOMIAL'.")) self._polybasisPivot = "MONOMIAL" self._approxParameters["polybasisPivot"] = self.polybasisPivot @property def polybasisPivot0(self): if "_" in self.polybasisPivot: return self.polybasisPivot.split("_")[0] return self.polybasisPivot @property def radialDirectionalWeightsPivot(self): """Value of radialDirectionalWeightsPivot.""" return self._radialDirectionalWeightsPivot @radialDirectionalWeightsPivot.setter def radialDirectionalWeightsPivot(self, radialDirectionalWeightsPivot): self._radialDirectionalWeightsPivot = radialDirectionalWeightsPivot self._approxParameters["radialDirectionalWeightsPivot"] = ( self.radialDirectionalWeightsPivot) @property def nNearestNeighborPivot(self): """Value of nNearestNeighborPivot.""" return self._nNearestNeighborPivot @nNearestNeighborPivot.setter def nNearestNeighborPivot(self, nNearestNeighborPivot): self._nNearestNeighborPivot = nNearestNeighborPivot self._approxParameters["nNearestNeighborPivot"] = ( self.nNearestNeighborPivot) @property def interpRcondPivot(self): """Value of interpRcondPivot.""" return self._interpRcondPivot @interpRcondPivot.setter def interpRcondPivot(self, interpRcondPivot): self._interpRcondPivot = interpRcondPivot self._approxParameters["interpRcondPivot"] = self.interpRcondPivot @property def M(self): """Value of M.""" return self._M @M.setter def M(self, M): if M < 0: raise RROMPyException("M must be non-negative.") self._M = M self._approxParameters["M"] = self.M @property def N(self): """Value of N.""" return self._N @N.setter def N(self, N): if N < 0: raise RROMPyException("N must be non-negative.") self._N = N self._approxParameters["N"] = self.N @property def polydegreetype(self): """Value of polydegreetype.""" return self._polydegreetype @polydegreetype.setter def polydegreetype(self, polydegreetype): try: polydegreetype = polydegreetype.upper().strip().replace(" ","") if polydegreetype not in ["TOTAL", "FULL"]: raise RROMPyException(("Prescribed polydegreetype not " "recognized.")) self._polydegreetype = polydegreetype except: RROMPyWarning(("Prescribed polydegreetype not recognized. " "Overriding to 'TOTAL'.")) self._polydegreetype = "TOTAL" self._approxParameters["polydegreetype"] = self.polydegreetype @property def robustTol(self): """Value of tolerance for robust rational denominator management.""" return self._robustTol @robustTol.setter def robustTol(self, robustTol): if robustTol < 0.: RROMPyWarning(("Overriding prescribed negative robustness " "tolerance to 0.")) robustTol = 0. self._robustTol = robustTol self._approxParameters["robustTol"] = self.robustTol def resetSamples(self): """Reset samples.""" super().resetSamples() self._musPUniqueCN = None self._derPIdxs = None self._reorderP = None def _setupPivotInterpolationIndices(self): """Setup parameters for polyvander.""" RROMPyAssert(self._mode, message = "Cannot setup interpolation indices.") if (self._musPUniqueCN is None or len(self._reorderP) != len(self.musPivot)): self._musPUniqueCN, musPIdxsTo, musPIdxs, musPCount = ( self.trainedModel.centerNormalizePivot(self.musPivot).unique( return_index = True, return_inverse = True, return_counts = True)) self._musPUnique = self.mus[musPIdxsTo] self._derPIdxs = [None] * len(self._musPUniqueCN) self._reorderP = np.empty(len(musPIdxs), dtype = int) filled = 0 for j, cnt in enumerate(musPCount): self._derPIdxs[j] = nextDerivativeIndices([], self.nparPivot, cnt) jIdx = np.nonzero(musPIdxs == j)[0] self._reorderP[jIdx] = np.arange(filled, filled + cnt) filled += cnt def _setupDenominator(self): """Compute rational denominator.""" RROMPyAssert(self._mode, message = "Cannot setup denominator.") vbMng(self, "INIT", "Starting computation of denominator.", 7) NinvD = None N0 = copy(self.N) qs = [] self.verbosity -= 10 for j in range(len(self.musMarginal)): self._N = N0 while self.N > 0: if NinvD != self.N: invD, fitinvP = self._computeInterpolantInverseBlocks() NinvD = self.N if self.POD: ev, eV = RI.findeveVGQR(self, self.samplingEngine.RPOD[j], invD) else: ev, eV = RI.findeveVGExplicit(self, self.samplingEngine.samples[j], invD) nevBad = checkRobustTolerance(ev, self.robustTol) if nevBad <= 1: break if self.catchInstability: raise RROMPyException(("Instability in denominator " "computation: eigenproblem is " "poorly conditioned.")) RROMPyWarning(("Smallest {} eigenvalues below tolerance. " "Reducing N by 1.").format(nevBad)) self.N = self.N - 1 if self.N <= 0: self._N = 0 eV = np.ones((1, 1)) q = PI() q.npar = self.nparPivot q.polybasis = self.polybasisPivot0 if self.polydegreetype == "TOTAL": q.coeffs = degreeTotalToFull(tuple([self.N + 1] * q.npar), q.npar, eV[:, 0]) else: q.coeffs = eV[:, 0].reshape([self.N + 1] * q.npar) qs = qs + [copy(q)] self.verbosity += 10 vbMng(self, "DEL", "Done computing denominator.", 7) return qs, fitinvP def _setupNumerator(self): """Compute rational numerator.""" RROMPyAssert(self._mode, message = "Cannot setup numerator.") vbMng(self, "INIT", "Starting computation of numerator.", 7) Qevaldiag = np.zeros((len(self.musPivot), len(self.musPivot)), dtype = np.complex) verb = self.trainedModel.verbosity self.trainedModel.verbosity = 0 self._setupPivotInterpolationIndices() cfun = totalDegreeN if self.polydegreetype == "TOTAL" else fullDegreeN M = copy(self.M) while len(self.musPivot) < cfun(M, self.nparPivot): M -= 1 if M < self.M: RROMPyWarning(("M too large compared to S. Reducing M by " "{}").format(self.M - M)) self.M = M tensor_idx = 0 ps = [] for k, muM in enumerate(self.musMarginal): self._M = M idxGlob = 0 for j, derIdxs in enumerate(self._derPIdxs): mujEff = [fp] * self.npar for jj, kk in enumerate(self.directionPivot): mujEff[kk] = self._musPUnique[j, jj] for jj, kk in enumerate(self.directionMarginal): mujEff[kk] = muM(0, jj) mujEff = checkParameter(mujEff, self.npar) nder = len(derIdxs) idxLoc = np.arange(len(self.musPivot))[ (self._reorderP >= idxGlob) * (self._reorderP < idxGlob + nder)] idxGlob += nder Qval = [0] * nder for der in range(nder): derIdx = hashI(der, self.nparPivot) derIdxEff = [0] * self.npar sclEff = [0] * self.npar for jj, kk in enumerate(self.directionPivot): derIdxEff[kk] = derIdx[jj] sclEff[kk] = self.scaleFactorPivot[jj] ** -1. Qval[der] = (self.trainedModel.getQVal(mujEff, derIdxEff, scl = sclEff) / multifactorial(derIdx)) for derU, derUIdx in enumerate(derIdxs): for derQ, derQIdx in enumerate(derIdxs): diffIdx = [x - y for (x, y) in zip(derUIdx, derQIdx)] if all([x >= 0 for x in diffIdx]): diffj = hashD(diffIdx) Qevaldiag[idxLoc[derU], idxLoc[derQ]] = Qval[diffj] while self.M >= 0: if self.polybasisPivot in ppb: p = PI() wellCond, msg = p.setupByInterpolation( self._musPUniqueCN, Qevaldiag, self.M, self.polybasisPivot, self.verbosity >= 5, self.polydegreetype == "TOTAL", {"derIdxs": self._derPIdxs, "reorder": self._reorderP, "scl": np.power(self.scaleFactorPivot, -1.)}, {"rcond": self.interpRcondPivot}) elif self.polybasisPivot in rbpb: p = RBI() wellCond, msg = p.setupByInterpolation( self._musPUniqueCN, Qevaldiag, self.M, self.polybasisPivot, self.radialDirectionalWeightsPivot, self.verbosity >= 5, self.polydegreetype == "TOTAL", {"derIdxs": self._derPIdxs, "reorder": self._reorderP, "scl": np.power(self.scaleFactorPivot, -1.), "nNearestNeighbor" : self.nNearestNeighborPivot}, {"rcond": self.interpRcondPivot}) else:# if self.polybasisPivot in mlspb: p = MLSI() wellCond, msg = p.setupByInterpolation( self._musPUniqueCN, Qevaldiag, self.M, self.polybasisPivot, self.radialDirectionalWeightsPivot, self.verbosity >= 5, self.polydegreetype == "TOTAL", {"derIdxs": self._derPIdxs, "reorder": self._reorderP, "scl": np.power(self.scaleFactorPivot, -1.), "nNearestNeighbor" : self.nNearestNeighborPivot}) vbMng(self, "MAIN", msg, 5) if wellCond: break if self.catchInstability: raise RROMPyException(("Instability in numerator " "computation: polyfit is " "poorly conditioned.")) RROMPyWarning(("Polyfit is poorly conditioned. " "Reducing M by 1.")) self.M = self.M - 1 tensor_idx_new = tensor_idx + Qevaldiag.shape[1] if self.POD: p.postmultiplyTensorize(self.samplingEngine.RPODCoalesced.T[ tensor_idx : tensor_idx_new, :]) else: p.pad(tensor_idx, len(self.mus) - tensor_idx_new) tensor_idx = tensor_idx_new ps = ps + [copy(p)] self.trainedModel.verbosity = verb vbMng(self, "DEL", "Done computing numerator.", 7) return ps def setupApprox(self): """ Compute rational interpolant. SVD-based robust eigenvalue management. """ if self.checkComputedApprox(): return RROMPyAssert(self._mode, message = "Cannot setup approximant.") vbMng(self, "INIT", "Setting up {}.". format(self.name()), 5) self.computeSnapshots() pMat = self.samplingEngine.samplesCoalesced.data - pMatEff = dot(self.HFEngine.C, pMat) if self.force_state else pMat + pMatEff = dot(self.HFEngine.C, pMat) if self.approx_state else pMat if self.trainedModel is None: self.trainedModel = self.tModelType() self.trainedModel.verbosity = self.verbosity self.trainedModel.timestamp = self.timestamp datadict = {"mu0": self.mu0, "projMat": pMatEff, "scaleFactor": self.scaleFactor, "rescalingExp": self.HFEngine.rescalingExp, "directionPivot": self.directionPivot} self.trainedModel.data = self.initializeModelData(datadict)[0] else: self.trainedModel = self.trainedModel self.trainedModel.data.projMat = copy(pMatEff) self.trainedModel.data.musPivot = copy(self.musPivot) self.trainedModel.data.musMarginal = copy(self.musMarginal) self.trainedModel.data.marginalInterp = self._setupMarginalInterp() if self.N > 0: Qs = self._setupDenominator()[0] else: Q = PI() Q.npar = self.nparPivot Q.coeffs = np.ones(tuple([1] * Q.npar), dtype = self.musMarginal.dtype) Q.polybasis = self.polybasisPivot0 Qs = [Q for _ in range(len(self.musMarginal))] self.trainedModel.data._temporary = 1 self.trainedModel.data.Qs = Qs self.trainedModel.data.Ps = self._setupNumerator() vbMng(self, "INIT", "Matching poles.", 10) self.trainedModel.initializeFromRational(self.HFEngine, self.matchingWeight, self.POD, - self.force_state) + self.approx_state) del self.trainedModel.data._temporary vbMng(self, "DEL", "Done matching poles.", 10) if not np.isinf(self.cutOffTolerance): vbMng(self, "INIT", "Recompressing by cut-off.", 10) msg = self.trainedModel.recompressByCutOff([-1., 1.], self.cutOffTolerance, self.cutOffType) vbMng(self, "DEL", "Done recompressing." + msg, 10) self.trainedModel.data.approxParameters = copy(self.approxParameters) vbMng(self, "DEL", "Done setting up approximant.", 5) def _computeInterpolantInverseBlocks(self) -> Tuple[List[Np2D], Np2D]: """ Compute inverse factors for minimal interpolant target functional. """ RROMPyAssert(self._mode, message = "Cannot solve eigenvalue problem.") self._setupPivotInterpolationIndices() cfun = totalDegreeN if self.polydegreetype == "TOTAL" else fullDegreeN N = copy(self.N) while len(self.musPivot) < cfun(N, self.nparPivot): N -= 1 if N < self.N: RROMPyWarning(("N too large compared to S. Reducing N by " "{}").format(self.N - N)) self.N = N while self.N >= 0: if self.polydegreetype == "TOTAL": TE, _, argIdxs = pvTP(self._musPUniqueCN, self.N, self.polybasisPivot0, self._derPIdxs, self._reorderP, scl = np.power(self.scaleFactorPivot, -1.)) TE = TE[:, argIdxs] idxsB = totalDegreeMaxMask(self.N, self.nparPivot) else: #if self.polydegreetype == "FULL": TE = pvP(self._musPUniqueCN, [self.N] * self.nparPivot, self.polybasisPivot0, self._derPIdxs, self._reorderP, scl = np.power(self.scaleFactorPivot, -1.)) idxsB = fullDegreeMaxMask(self.N, self.nparPivot) fitOut = customPInv(TE, rcond = self.interpRcondPivot, full = True) vbMng(self, "MAIN", ("Fitting {} samples with degree {} through {}... " "Conditioning of pseudoinverse system: {:.4e}.").format( TE.shape[0], self.N, polyfitname(self.polybasisPivot0), fitOut[1][1][0] / fitOut[1][1][-1]), 5) if fitOut[1][0] == TE.shape[1]: fitinvP = fitOut[0][idxsB, :] break RROMPyWarning("Polyfit is poorly conditioned. Reducing N by 1.") self.N -= 1 if self.N < 0: raise RROMPyException(("Instability in computation of " "denominator. Aborting.")) TN, _, argIdxs = pvTP(self._musPUniqueCN, self.N, self.polybasisPivot0, self._derPIdxs, self._reorderP, scl = np.power(self.scaleFactorPivot, -1.)) TN = TN[:, argIdxs] invD = [None] * (len(idxsB)) for k in range(len(idxsB)): pseudoInv = np.diag(fitinvP[k, :]) idxGlob = 0 for j, derIdxs in enumerate(self._derPIdxs): nder = len(derIdxs) idxGlob += nder if nder > 1: idxLoc = np.arange(len(self.musPivot))[ (self._reorderP >= idxGlob - nder) * (self._reorderP < idxGlob)] invLoc = fitinvP[k, idxLoc] pseudoInv[np.ix_(idxLoc, idxLoc)] = 0. for diffj, diffjIdx in enumerate(derIdxs): for derQ, derQIdx in enumerate(derIdxs): derUIdx = [x - y for (x, y) in zip(diffjIdx, derQIdx)] if all([x >= 0 for x in derUIdx]): derU = hashD(derUIdx) pseudoInv[idxLoc[derU], idxLoc[derQ]] = ( invLoc[diffj]) invD[k] = dot(pseudoInv, TN) return invD, fitinvP def getResidues(self, *args, **kwargs) -> Np1D: """ Obtain approximant residues. Returns: Matrix with residues as columns. """ return self.trainedModel.getResidues(*args, **kwargs) diff --git a/rrompy/reduction_methods/standard/generic_standard_approximant.py b/rrompy/reduction_methods/standard/generic_standard_approximant.py index 01056da..a403e4c 100644 --- a/rrompy/reduction_methods/standard/generic_standard_approximant.py +++ b/rrompy/reduction_methods/standard/generic_standard_approximant.py @@ -1,147 +1,148 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import numpy as np from copy import deepcopy as copy from rrompy.reduction_methods.base.generic_approximant import ( GenericApproximant) from rrompy.utilities.base.types import DictAny, HFEng, paramVal from rrompy.utilities.base import verbosityManager as vbMng from rrompy.utilities.exception_manager import RROMPyException, RROMPyAssert from rrompy.parameter import checkParameterList __all__ = ['GenericStandardApproximant'] class GenericStandardApproximant(GenericApproximant): """ ROM interpolant computation for parametric problems (ABSTRACT). Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': whether to compute POD of snapshots; defaults to True; - 'S': total number of samples current approximant relies upon; - 'sampler': sample point generator. Defaults to empty dict. - force_state(optional): Whether to approximate state. Defaults to False. + approx_state(optional): Whether to approximate state. Defaults to + False. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. mu0: Default parameter. mus: Array of snapshot parameters. approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList. parameterListSoft: Recognized keys of soft approximant parameters: - 'POD': whether to compute POD of snapshots. parameterListCritical: Recognized keys of critical approximant parameters: - 'S': total number of samples current approximant relies upon; - 'sampler': sample point generator. - force_state: Whether to approximate state. + approx_state: Whether to approximate state. verbosity: Verbosity level. POD: Whether to compute POD of snapshots. S: Number of solution snapshots over which current approximant is based upon. sampler: Sample point generator. muBounds: list of bounds for parameter values. samplingEngine: Sampling engine. uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as sampleList. lastSolvedHF: Parameter(s) corresponding to last computed high fidelity solution(s) as parameterList. uApproxReduced: Reduced approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApproxReduced: Parameter(s) corresponding to last computed reduced approximate solution(s) as parameterList. uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApprox: Parameter(s) corresponding to last computed approximate solution(s) as parameterList. """ def __init__(self, HFEngine:HFEng, mu0 : paramVal = None, - approxParameters : DictAny = {}, force_state : bool = False, + approxParameters : DictAny = {}, approx_state : bool = False, verbosity : int = 10, timestamp : bool = True): self._preInit() from rrompy.parameter.parameter_sampling import QuadratureSampler as QS self._addParametersToList([], [], ["sampler"], [QS([[0], [1]], "UNIFORM")]) del QS super().__init__(HFEngine = HFEngine, mu0 = mu0, approxParameters = approxParameters, - force_state = force_state, verbosity = verbosity, + approx_state = approx_state, verbosity = verbosity, timestamp = timestamp) self._postInit() @property def mus(self): """Value of mus. Its assignment may reset snapshots.""" return self._mus @mus.setter def mus(self, mus): mus = checkParameterList(mus, self.npar)[0] musOld = copy(self.mus) if hasattr(self, '_mus') else None if (musOld is None or len(mus) != len(musOld) or not mus == musOld): self.resetSamples() self._mus = mus @property def muBounds(self): """Value of muBounds.""" return self.sampler.lims @property def sampler(self): """Value of sampler.""" return self._sampler @sampler.setter def sampler(self, sampler): if 'generatePoints' not in dir(sampler): raise RROMPyException("Sampler type not recognized.") if hasattr(self, '_sampler') and self._sampler is not None: samplerOld = self.sampler self._sampler = sampler self._approxParameters["sampler"] = self.sampler.__str__() if not 'samplerOld' in locals() or samplerOld != self.sampler: self.resetSamples() def setSamples(self, samplingEngine): """Copy samplingEngine and samples.""" super().setSamples(samplingEngine) self.mus = copy(self.samplingEngine.mus) def computeSnapshots(self): """Compute snapshots of solution map.""" RROMPyAssert(self._mode, message = "Cannot start snapshot computation.") if self.samplingEngine.nsamples != self.S: self.computeScaleFactor() vbMng(self, "INIT", "Starting computation of snapshots.", 5) self.mus = self.sampler.generatePoints(self.S) self.samplingEngine.iterSample(self.mus) vbMng(self, "DEL", "Done computing snapshots.", 5) def computeScaleFactor(self): """Compute parameter rescaling factor.""" RROMPyAssert(self._mode, message = "Cannot compute rescaling factor.") self.scaleFactor = .5 * np.abs( self.muBounds[0] ** self.HFEngine.rescalingExp - self.muBounds[1] ** self.HFEngine.rescalingExp) diff --git a/rrompy/reduction_methods/standard/rational_interpolant.py b/rrompy/reduction_methods/standard/rational_interpolant.py index c515f92..a5f9743 100644 --- a/rrompy/reduction_methods/standard/rational_interpolant.py +++ b/rrompy/reduction_methods/standard/rational_interpolant.py @@ -1,568 +1,569 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from copy import deepcopy as copy import numpy as np from rrompy.reduction_methods.base import checkRobustTolerance from .generic_standard_approximant import GenericStandardApproximant from rrompy.utilities.poly_fitting.polynomial import ( polybases as ppb, polyfitname, polyvander as pvP, polyvanderTotal as pvTP, PolynomialInterpolator as PI) from rrompy.utilities.poly_fitting.radial_basis import (polybases as rbpb, RadialBasisInterpolator as RBI) from rrompy.utilities.poly_fitting.moving_least_squares import ( polybases as mlspb, MovingLeastSquaresInterpolator as MLSI) from rrompy.utilities.base.types import (Np1D, Np2D, HFEng, DictAny, Tuple, List, paramVal, sampList) from rrompy.utilities.base import verbosityManager as vbMng from rrompy.utilities.numerical import (multifactorial, customPInv, dot, fullDegreeN, totalDegreeN, degreeTotalToFull, fullDegreeMaxMask, totalDegreeMaxMask, nextDerivativeIndices, hashDerivativeToIdx as hashD, hashIdxToDerivative as hashI) from rrompy.utilities.exception_manager import (RROMPyException, RROMPyAssert, RROMPyWarning) __all__ = ['RationalInterpolant'] class RationalInterpolant(GenericStandardApproximant): """ ROM rational interpolant computation for parametric problems. Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': whether to compute POD of snapshots; defaults to True; - 'S': total number of samples current approximant relies upon; - 'sampler': sample point generator; - 'polybasis': type of polynomial basis for interpolation; defaults to 'MONOMIAL'; - 'M': degree of rational interpolant numerator; defaults to 0; - 'N': degree of rational interpolant denominator; defaults to 0; - 'polydegreetype': type of polynomial degree; defaults to 'TOTAL'; - 'radialDirectionalWeights': radial basis weights for interpolant numerator; defaults to 0, i.e. identity; - 'nNearestNeighbor': mumber of nearest neighbors considered in numerator if polybasis allows; defaults to -1; - 'interpRcond': tolerance for interpolation; defaults to None; - 'robustTol': tolerance for robust rational denominator management; defaults to 0. Defaults to empty dict. - force_state(optional): Whether to approximate state. Defaults to False. + approx_state(optional): Whether to approximate state. Defaults to + False. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. mu0: Default parameter. mus: Array of snapshot parameters. approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList. parameterListSoft: Recognized keys of soft approximant parameters: - 'POD': whether to compute POD of snapshots; - 'polybasis': type of polynomial basis for interpolation; - 'M': degree of rational interpolant numerator; - 'N': degree of rational interpolant denominator; - 'polydegreetype': type of polynomial degree; - 'radialDirectionalWeights': radial basis weights for interpolant numerator; - 'nNearestNeighbor': mumber of nearest neighbors considered in numerator if polybasis allows; - 'interpRcond': tolerance for interpolation via numpy.polyfit; - 'robustTol': tolerance for robust rational denominator management. parameterListCritical: Recognized keys of critical approximant parameters: - 'S': total number of samples current approximant relies upon; - 'sampler': sample point generator. - force_state: Whether to approximate state. + approx_state: Whether to approximate state. verbosity: Verbosity level. POD: Whether to compute POD of snapshots. S: Number of solution snapshots over which current approximant is based upon. sampler: Sample point generator. polybasis: type of polynomial basis for interpolation. M: Numerator degree of approximant. N: Denominator degree of approximant. polydegreetype: Type of polynomial degree. radialDirectionalWeights: Radial basis weights for interpolant numerator. nNearestNeighbor: Number of nearest neighbors considered in numerator if polybasis allows. interpRcond: Tolerance for interpolation via numpy.polyfit. robustTol: Tolerance for robust rational denominator management. muBounds: list of bounds for parameter values. samplingEngine: Sampling engine. uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as sampleList. lastSolvedHF: Parameter(s) corresponding to last computed high fidelity solution(s) as parameterList. uApproxReduced: Reduced approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApproxReduced: Parameter(s) corresponding to last computed reduced approximate solution(s) as parameterList. uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApprox: Parameter(s) corresponding to last computed approximate solution(s) as parameterList. Q: Numpy 1D vector containing complex coefficients of approximant denominator. P: Numpy 2D vector whose columns are FE dofs of coefficients of approximant numerator. """ def __init__(self, HFEngine:HFEng, mu0 : paramVal = None, - approxParameters : DictAny = {}, force_state : bool = False, + approxParameters : DictAny = {}, approx_state : bool = False, verbosity : int = 10, timestamp : bool = True): self._preInit() self._addParametersToList(["polybasis", "M", "N", "polydegreetype", "radialDirectionalWeights", "nNearestNeighbor", "interpRcond", "robustTol"], ["MONOMIAL", 0, 0, "TOTAL", 1, -1, -1, 0]) super().__init__(HFEngine = HFEngine, mu0 = mu0, approxParameters = approxParameters, - force_state = force_state, verbosity = verbosity, + approx_state = approx_state, verbosity = verbosity, timestamp = timestamp) self.catchInstability = False self._postInit() @property def tModelType(self): from rrompy.reduction_methods.trained_model import TrainedModelRational return TrainedModelRational @property def polybasis(self): """Value of polybasis.""" return self._polybasis @polybasis.setter def polybasis(self, polybasis): try: polybasis = polybasis.upper().strip().replace(" ","") if polybasis not in ppb + rbpb + mlspb: raise RROMPyException("Prescribed polybasis not recognized.") self._polybasis = polybasis except: RROMPyWarning(("Prescribed polybasis not recognized. Overriding " "to 'MONOMIAL'.")) self._polybasis = "MONOMIAL" self._approxParameters["polybasis"] = self.polybasis @property def polybasis0(self): if "_" in self.polybasis: return self.polybasis.split("_")[0] return self.polybasis @property def interpRcond(self): """Value of interpRcond.""" return self._interpRcond @interpRcond.setter def interpRcond(self, interpRcond): self._interpRcond = interpRcond self._approxParameters["interpRcond"] = self.interpRcond @property def radialDirectionalWeights(self): """Value of radialDirectionalWeights.""" return self._radialDirectionalWeights @radialDirectionalWeights.setter def radialDirectionalWeights(self, radialDirectionalWeights): self._radialDirectionalWeights = radialDirectionalWeights self._approxParameters["radialDirectionalWeights"] = ( self.radialDirectionalWeights) @property def nNearestNeighbor(self): """Value of nNearestNeighbor.""" return self._nNearestNeighbor @nNearestNeighbor.setter def nNearestNeighbor(self, nNearestNeighbor): self._nNearestNeighbor = nNearestNeighbor self._approxParameters["nNearestNeighbor"] = self.nNearestNeighbor @property def M(self): """Value of M.""" return self._M @M.setter def M(self, M): if M < 0: raise RROMPyException("M must be non-negative.") self._M = M self._approxParameters["M"] = self.M @property def N(self): """Value of N.""" return self._N @N.setter def N(self, N): if N < 0: raise RROMPyException("N must be non-negative.") self._N = N self._approxParameters["N"] = self.N @property def polydegreetype(self): """Value of polydegreetype.""" return self._polydegreetype @polydegreetype.setter def polydegreetype(self, polydegreetype): try: polydegreetype = polydegreetype.upper().strip().replace(" ","") if polydegreetype not in ["TOTAL", "FULL"]: raise RROMPyException(("Prescribed polydegreetype not " "recognized.")) self._polydegreetype = polydegreetype except: RROMPyWarning(("Prescribed polydegreetype not recognized. " "Overriding to 'TOTAL'.")) self._polydegreetype = "TOTAL" self._approxParameters["polydegreetype"] = self.polydegreetype @property def robustTol(self): """Value of tolerance for robust rational denominator management.""" return self._robustTol @robustTol.setter def robustTol(self, robustTol): if robustTol < 0.: RROMPyWarning(("Overriding prescribed negative robustness " "tolerance to 0.")) robustTol = 0. self._robustTol = robustTol self._approxParameters["robustTol"] = self.robustTol def resetSamples(self): """Reset samples.""" super().resetSamples() self._musUniqueCN = None self._derIdxs = None self._reorder = None def _setupInterpolationIndices(self): """Setup parameters for polyvander.""" RROMPyAssert(self._mode, message = "Cannot setup interpolation indices.") if self._musUniqueCN is None or len(self._reorder) != len(self.mus): self._musUniqueCN, musIdxsTo, musIdxs, musCount = ( self.trainedModel.centerNormalize(self.mus).unique( return_index = True, return_inverse = True, return_counts = True)) self._musUnique = self.mus[musIdxsTo] self._derIdxs = [None] * len(self._musUniqueCN) self._reorder = np.empty(len(musIdxs), dtype = int) filled = 0 for j, cnt in enumerate(musCount): self._derIdxs[j] = nextDerivativeIndices([], self.mus.shape[1], cnt) jIdx = np.nonzero(musIdxs == j)[0] self._reorder[jIdx] = np.arange(filled, filled + cnt) filled += cnt def _setupDenominator(self): """Compute rational denominator.""" RROMPyAssert(self._mode, message = "Cannot setup denominator.") vbMng(self, "INIT", "Starting computation of denominator.", 7) while self.N > 0: invD, fitinv = self._computeInterpolantInverseBlocks() if self.POD: ev, eV = self.findeveVGQR(self.samplingEngine.RPOD, invD) else: ev, eV = self.findeveVGExplicit(self.samplingEngine.samples, invD) nevBad = checkRobustTolerance(ev, self.robustTol) if nevBad <= 1: break if self.catchInstability: raise RROMPyException(("Instability in denominator " "computation: eigenproblem is poorly " "conditioned.")) RROMPyWarning(("Smallest {} eigenvalues below tolerance. Reducing " "N by 1.").format(nevBad)) self.N = self.N - 1 if self.N <= 0: self._N = 0 eV = np.ones((1, 1)) q = PI() q.npar = self.npar q.polybasis = self.polybasis0 if self.polydegreetype == "TOTAL": q.coeffs = degreeTotalToFull(tuple([self.N + 1] * self.npar), self.npar, eV[:, 0]) else: q.coeffs = eV[:, 0].reshape([self.N + 1] * self.npar) vbMng(self, "DEL", "Done computing denominator.", 7) return q, fitinv def _setupNumerator(self): """Compute rational numerator.""" RROMPyAssert(self._mode, message = "Cannot setup numerator.") vbMng(self, "INIT", "Starting computation of numerator.", 7) Qevaldiag = np.zeros((len(self.mus), len(self.mus)), dtype = np.complex) verb = self.trainedModel.verbosity self.trainedModel.verbosity = 0 self._setupInterpolationIndices() idxGlob = 0 for j, derIdxs in enumerate(self._derIdxs): nder = len(derIdxs) idxLoc = np.arange(len(self.mus))[(self._reorder >= idxGlob) * (self._reorder < idxGlob + nder)] idxGlob += nder Qval = [0] * nder for der in range(nder): derIdx = hashI(der, self.npar) Qval[der] = (self.trainedModel.getQVal( self._musUnique[j], derIdx, scl = np.power(self.scaleFactor, -1.)) / multifactorial(derIdx)) for derU, derUIdx in enumerate(derIdxs): for derQ, derQIdx in enumerate(derIdxs): diffIdx = [x - y for (x, y) in zip(derUIdx, derQIdx)] if all([x >= 0 for x in diffIdx]): diffj = hashD(diffIdx) Qevaldiag[idxLoc[derU], idxLoc[derQ]] = Qval[diffj] if self.POD: Qevaldiag = Qevaldiag.dot(self.samplingEngine.RPOD.T) self.trainedModel.verbosity = verb cfun = totalDegreeN if self.polydegreetype == "TOTAL" else fullDegreeN M = copy(self.M) while len(self.mus) < cfun(M, self.npar): M -= 1 if M < self.M: RROMPyWarning(("M too large compared to S. Reducing M by " "{}").format(self.M - M)) self.M = M while self.M >= 0: if self.polybasis in ppb: p = PI() wellCond, msg = p.setupByInterpolation( self._musUniqueCN, Qevaldiag, self.M, self.polybasis, self.verbosity >= 5, self.polydegreetype == "TOTAL", {"derIdxs": self._derIdxs, "reorder": self._reorder, "scl": np.power(self.scaleFactor, -1.)}, {"rcond": self.interpRcond}) elif self.polybasis in rbpb: p = RBI() wellCond, msg = p.setupByInterpolation( self._musUniqueCN, Qevaldiag, self.M, self.polybasis, self.radialDirectionalWeights, self.verbosity >= 5, self.polydegreetype == "TOTAL", {"derIdxs": self._derIdxs, "reorder": self._reorder, "scl": np.power(self.scaleFactor, -1.), "nNearestNeighbor": self.nNearestNeighbor}, {"rcond": self.interpRcond}) else:# if self.polybasis in mlspb: p = MLSI() wellCond, msg = p.setupByInterpolation( self._musUniqueCN, Qevaldiag, self.M, self.polybasis, self.radialDirectionalWeights, self.verbosity >= 5, self.polydegreetype == "TOTAL", {"derIdxs": self._derIdxs, "reorder": self._reorder, "scl": np.power(self.scaleFactor, -1.), "nNearestNeighbor": self.nNearestNeighbor}) vbMng(self, "MAIN", msg, 5) if wellCond: break if self.catchInstability: raise RROMPyException(("Instability in numerator computation: " "polyfit is poorly conditioned.")) RROMPyWarning("Polyfit is poorly conditioned. Reducing M by 1.") self.M = self.M - 1 if self.M < 0: raise RROMPyException(("Instability in computation of numerator. " "Aborting.")) vbMng(self, "DEL", "Done computing numerator.", 7) return p def setupApprox(self): """ Compute rational interpolant. SVD-based robust eigenvalue management. """ if self.checkComputedApprox(): return RROMPyAssert(self._mode, message = "Cannot setup approximant.") vbMng(self, "INIT", "Setting up {}.". format(self.name()), 5) self.computeSnapshots() pMat = self.samplingEngine.samples.data - pMatEff = dot(self.HFEngine.C, pMat) if self.force_state else pMat + pMatEff = dot(self.HFEngine.C, pMat) if self.approx_state else pMat if self.trainedModel is None: self.trainedModel = self.tModelType() self.trainedModel.verbosity = self.verbosity self.trainedModel.timestamp = self.timestamp datadict = {"mu0": self.mu0, "projMat": pMatEff, "scaleFactor": self.scaleFactor, "rescalingExp": self.HFEngine.rescalingExp} self.trainedModel.data = self.initializeModelData(datadict)[0] else: self.trainedModel = self.trainedModel self.trainedModel.data.projMat = copy(pMatEff) if self.N > 0: Q = self._setupDenominator()[0] else: Q = PI() Q.coeffs = np.ones(tuple([1] * self.npar), dtype = np.complex) Q.npar = self.npar Q.polybasis = self.polybasis self.trainedModel.data.mus = copy(self.mus) self.trainedModel.data.Q = Q self.trainedModel.data.P = self._setupNumerator() self.trainedModel.data.approxParameters = copy(self.approxParameters) vbMng(self, "DEL", "Done setting up approximant.", 5) def _computeInterpolantInverseBlocks(self) -> Tuple[List[Np2D], Np2D]: """ Compute inverse factors for minimal interpolant target functional. """ RROMPyAssert(self._mode, message = "Cannot solve eigenvalue problem.") self._setupInterpolationIndices() cfun = totalDegreeN if self.polydegreetype == "TOTAL" else fullDegreeN N = copy(self.N) while len(self.mus) < cfun(N, self.npar): N -= 1 if N < self.N: RROMPyWarning(("N too large compared to S. Reducing N by " "{}").format(self.N - N)) self.N = N while self.N >= 0: if self.polydegreetype == "TOTAL": TE, _, argIdxs = pvTP(self._musUniqueCN, self.N, self.polybasis0, self._derIdxs, self._reorder, scl = np.power(self.scaleFactor, -1.)) TE = TE[:, argIdxs] idxsB = totalDegreeMaxMask(self.N, self.npar) else: #if self.polydegreetype == "FULL": TE = pvP(self._musUniqueCN, [self.N] * self.npar, self.polybasis0, self._derIdxs, self._reorder, scl = np.power(self.scaleFactor, -1.)) idxsB = fullDegreeMaxMask(self.N, self.npar) fitOut = customPInv(TE, rcond = self.interpRcond, full = True) vbMng(self, "MAIN", ("Fitting {} samples with degree {} through {}... " "Conditioning of pseudoinverse system: {:.4e}.").format( TE.shape[0], self.N, polyfitname(self.polybasis0), fitOut[1][1][0] / fitOut[1][1][-1]), 5) if fitOut[1][0] == TE.shape[1]: fitinv = fitOut[0][idxsB, :] break if self.catchInstability: raise RROMPyException(("Instability in denominator " "computation: polyfit is poorly " "conditioned.")) RROMPyWarning("Polyfit is poorly conditioned. Reducing N by 1.") self.N = self.N - 1 if self.polydegreetype == "TOTAL": TN, _, argIdxs = pvTP(self._musUniqueCN, self.N, self.polybasis0, self._derIdxs, self._reorder, scl = np.power(self.scaleFactor, -1.)) TN = TN[:, argIdxs] else: #if self.polydegreetype == "FULL": TN = pvP(self._musUniqueCN, [self.N] * self.npar, self.polybasis0, self._derIdxs, self._reorder, scl = np.power(self.scaleFactor, -1.)) invD = [None] * (len(idxsB)) for k in range(len(idxsB)): pseudoInv = np.diag(fitinv[k, :]) idxGlob = 0 for j, derIdxs in enumerate(self._derIdxs): nder = len(derIdxs) idxGlob += nder if nder > 1: idxLoc = np.arange(len(self.mus))[ (self._reorder >= idxGlob - nder) * (self._reorder < idxGlob)] invLoc = fitinv[k, idxLoc] pseudoInv[np.ix_(idxLoc, idxLoc)] = 0. for diffj, diffjIdx in enumerate(derIdxs): for derQ, derQIdx in enumerate(derIdxs): derUIdx = [x - y for (x, y) in zip(diffjIdx, derQIdx)] if all([x >= 0 for x in derUIdx]): derU = hashD(derUIdx) pseudoInv[idxLoc[derU], idxLoc[derQ]] = ( invLoc[diffj]) invD[k] = dot(pseudoInv, TN) return invD, fitinv def findeveVGExplicit(self, sampleE:sampList, invD:List[Np2D]) -> Tuple[Np1D, Np2D]: """ Compute explicitly eigenvalues and eigenvectors of rational denominator matrix. """ RROMPyAssert(self._mode, message = "Cannot solve eigenvalue problem.") nEnd = invD[0].shape[1] eWidth = len(invD) vbMng(self, "INIT", "Building gramian matrix.", 10) gramian = self.HFEngine.innerProduct(sampleE, sampleE, - is_state = self.force_state) + is_state = self.approx_state) G = np.zeros((nEnd, nEnd), dtype = np.complex) for k in range(eWidth): G += dot(dot(gramian, invD[k]).T, invD[k].conj()).T vbMng(self, "DEL", "Done building gramian.", 10) vbMng(self, "INIT", "Solving eigenvalue problem for gramian matrix.", 7) ev, eV = np.linalg.eigh(G) vbMng(self, "MAIN", ("Solved eigenvalue problem of size {} with condition number " "{:.4e}.").format(nEnd, ev[-1] / ev[0]), 5) vbMng(self, "DEL", "Done solving eigenvalue problem.", 7) return ev, eV def findeveVGQR(self, RPODE:Np2D, invD:List[Np2D]) -> Tuple[Np1D, Np2D]: """ Compute eigenvalues and eigenvectors of rational denominator matrix through SVD of R factor. """ RROMPyAssert(self._mode, message = "Cannot solve eigenvalue problem.") nEnd = invD[0].shape[1] S = RPODE.shape[0] eWidth = len(invD) vbMng(self, "INIT", "Building half-gramian matrix stack.", 10) Rstack = np.zeros((S * eWidth, nEnd), dtype = np.complex) for k in range(eWidth): Rstack[k * S : (k + 1) * S, :] = dot(RPODE, invD[k]) vbMng(self, "DEL", "Done building half-gramian.", 10) vbMng(self, "INIT", "Solving svd for square root of gramian matrix.", 7) _, s, eV = np.linalg.svd(Rstack, full_matrices = False) ev = s[::-1] eV = eV[::-1, :].T.conj() vbMng(self, "MAIN", ("Solved svd problem of size {} x {} with condition number " "{:.4e}.").format(*Rstack.shape, s[0] / s[-1]), 5) vbMng(self, "DEL", "Done solving svd.", 7) return ev, eV def getResidues(self, *args, **kwargs) -> Np1D: """ Obtain approximant residues. Returns: Matrix with residues as columns. """ return self.trainedModel.getResidues(*args, **kwargs) diff --git a/rrompy/reduction_methods/standard/rational_moving_least_squares.py b/rrompy/reduction_methods/standard/rational_moving_least_squares.py index 4093a9b..4fc7629 100644 --- a/rrompy/reduction_methods/standard/rational_moving_least_squares.py +++ b/rrompy/reduction_methods/standard/rational_moving_least_squares.py @@ -1,311 +1,312 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from copy import deepcopy as copy import numpy as np from .rational_interpolant import RationalInterpolant from rrompy.utilities.poly_fitting.polynomial import (polybases as ppb, polyvander as pvP, polyvanderTotal as pvTP) from rrompy.utilities.base.types import Np2D, HFEng, DictAny, paramVal from rrompy.utilities.base import verbosityManager as vbMng from rrompy.utilities.numerical import (fullDegreeMaxMask, totalDegreeMaxMask, dot) from rrompy.utilities.exception_manager import (RROMPyException, RROMPyAssert, RROMPyWarning) __all__ = ['RationalMovingLeastSquares'] class RationalMovingLeastSquares(RationalInterpolant): """ ROM rational moving LS interpolant computation for parametric problems. Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': whether to compute POD of snapshots; defaults to True; - 'S': total number of samples current approximant relies upon; - 'sampler': sample point generator; - 'polybasis': type of polynomial basis for interpolation; defaults to 'MONOMIAL'; - 'M': degree of rational interpolant numerator; defaults to 0; - 'N': degree of rational interpolant denominator; defaults to 0; - 'polydegreetype': type of polynomial degree; defaults to 'TOTAL'; - 'radialBasis': numerator radial basis type; defaults to 'GAUSSIAN'; - 'radialDirectionalWeights': radial basis weights for interpolant numerator; defaults to 0, i.e. identity; - 'nNearestNeighbor': number of nearest neighbors considered in numerator if radialBasis allows; defaults to -1; - 'radialBasisDen': denominator radial basis type; defaults to 'GAUSSIAN'; - 'radialDirectionalWeightsDen': radial basis weights for interpolant denominator; defaults to 0, i.e. identity; - 'nNearestNeighborDen': number of nearest neighbors considered in denominator if radialBasisDen allows; defaults to -1; - 'interpRcond': tolerance for interpolation; defaults to None; - 'robustTol': tolerance for robust rational denominator management; defaults to 0. Defaults to empty dict. - force_state(optional): Whether to approximate state. Defaults to False. + approx_state(optional): Whether to approximate state. Defaults to + False. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. mu0: Default parameter. mus: Array of snapshot parameters. approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList. parameterListSoft: Recognized keys of soft approximant parameters: - 'POD': whether to compute POD of snapshots; - 'polybasis': type of polynomial basis for interpolation; - 'M': degree of rational interpolant numerator; - 'N': degree of rational interpolant denominator; - 'polydegreetype': type of polynomial degree; - 'radialBasis': numerator radial basis type; - 'radialDirectionalWeights': radial basis weights for interpolant numerator; - 'nNearestNeighbor': number of nearest neighbors considered in numerator if radialBasis allows; - 'radialBasisDen': denominator radial basis type; - 'radialDirectionalWeightsDen': radial basis weights for interpolant denominator; - 'nNearestNeighborDen': number of nearest neighbors considered in denominator if radialBasisDen allows; - 'interpRcond': tolerance for interpolation via numpy.polyfit; - 'robustTol': tolerance for robust rational denominator management. parameterListCritical: Recognized keys of critical approximant parameters: - 'S': total number of samples current approximant relies upon; - 'sampler': sample point generator. - force_state: Whether to approximate state. + approx_state: Whether to approximate state. verbosity: Verbosity level. POD: Whether to compute POD of snapshots. S: Number of solution snapshots over which current approximant is based upon. sampler: Sample point generator. polybasis: type of polynomial basis for interpolation. M: Numerator degree of approximant. N: Denominator degree of approximant. polydegreetype: Type of polynomial degree. radialBasis: Numerator radial basis type. radialDirectionalWeights: Radial basis weights for interpolant numerator. nNearestNeighbor: Number of nearest neighbors considered in numerator if radialBasis allows. radialBasisDen: Denominator radial basis type. radialDirectionalWeightsDen: Radial basis weights for interpolant denominator. nNearestNeighborDen: Number of nearest neighbors considered in denominator if radialBasisDen allows. interpRcond: Tolerance for interpolation via numpy.polyfit. robustTol: Tolerance for robust rational denominator management. muBounds: list of bounds for parameter values. samplingEngine: Sampling engine. uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as sampleList. lastSolvedHF: Parameter(s) corresponding to last computed high fidelity solution(s) as parameterList. uApproxReduced: Reduced approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApproxReduced: Parameter(s) corresponding to last computed reduced approximate solution(s) as parameterList. uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApprox: Parameter(s) corresponding to last computed approximate solution(s) as parameterList. Q: Numpy 1D vector containing complex coefficients of approximant denominator. P: Numpy 2D vector whose columns are FE dofs of coefficients of approximant numerator. """ def __init__(self, HFEngine:HFEng, mu0 : paramVal = None, - approxParameters : DictAny = {}, force_state : bool = False, + approxParameters : DictAny = {}, approx_state : bool = False, verbosity : int = 10, timestamp : bool = True): self._preInit() self._addParametersToList(["radialBasis", "radialBasisDen", "radialDirectionalWeightsDen", "nNearestNeighborDen"], ["GAUSSIAN", "GAUSSIAN", 1, -1]) super().__init__(HFEngine = HFEngine, mu0 = mu0, approxParameters = approxParameters, - force_state = force_state, verbosity = verbosity, + approx_state = approx_state, verbosity = verbosity, timestamp = timestamp) self.catchInstability = False self._postInit() @property def tModelType(self): from rrompy.reduction_methods.trained_model import \ TrainedModelRationalMLS return TrainedModelRationalMLS @property def polybasis(self): """Value of polybasis.""" return self._polybasis @polybasis.setter def polybasis(self, polybasis): try: polybasis = polybasis.upper().strip().replace(" ","") if polybasis not in ppb: raise RROMPyException("Prescribed polybasis not recognized.") self._polybasis = polybasis except: RROMPyWarning(("Prescribed polybasis not recognized. Overriding " "to 'MONOMIAL'.")) self._polybasis = "MONOMIAL" self._approxParameters["polybasis"] = self.polybasis @property def radialBasis(self): """Value of radialBasis.""" return self._radialBasis @radialBasis.setter def radialBasis(self, radialBasis): self._radialBasis = radialBasis self._approxParameters["radialBasis"] = self.radialBasis @property def radialBasisDen(self): """Value of radialBasisDen.""" return self._radialBasisDen @radialBasisDen.setter def radialBasisDen(self, radialBasisDen): self._radialBasisDen = radialBasisDen self._approxParameters["radialBasisDen"] = self.radialBasisDen @property def radialDirectionalWeightsDen(self): """Value of radialDirectionalWeightsDen.""" return self._radialDirectionalWeightsDen @radialDirectionalWeightsDen.setter def radialDirectionalWeightsDen(self, radialDirectionalWeightsDen): self._radialDirectionalWeightsDen = radialDirectionalWeightsDen self._approxParameters["radialDirectionalWeightsDen"] = ( self.radialDirectionalWeightsDen) @property def nNearestNeighborDen(self): """Value of nNearestNeighborDen.""" return self._nNearestNeighborDen @nNearestNeighborDen.setter def nNearestNeighborDen(self, nNearestNeighborDen): self._nNearestNeighborDen = nNearestNeighborDen self._approxParameters["nNearestNeighborDen"] = ( self.nNearestNeighborDen) def _setupDenominator(self) -> Np2D: """Compute rational denominator.""" RROMPyAssert(self._mode, message = "Cannot setup denominator.") vbMng(self, "INIT", "Starting computation of denominator-related blocks.", 7) self._setupInterpolationIndices() if self.polydegreetype == "TOTAL": TN, _, argIdxs = pvTP(self._musUniqueCN, self.N, self.polybasis0, self._derIdxs, self._reorder, scl = np.power(self.scaleFactor, -1.)) TN = TN[:, argIdxs] else: #if self.polydegreetype == "FULL": TN = pvP(self._musUniqueCN, [self.N] * self.npar, self.polybasis0, self._derIdxs, self._reorder, scl = np.power(self.scaleFactor, -1.)) TNTen = np.zeros((self.S, self.S, TN.shape[1]), dtype = TN.dtype) TNTen[np.arange(self.S), np.arange(self.S)] = TN if self.POD: TNTen = dot(self.samplingEngine.RPOD, TNTen) vbMng(self, "DEL", "Done computing denominator-related blocks.", 7) return TN, TNTen def _setupNumerator(self) -> Np2D: """Compute rational numerator.""" RROMPyAssert(self._mode, message = "Cannot setup numerator.") vbMng(self, "INIT", "Starting computation of denominator-related blocks.", 7) self._setupInterpolationIndices() if self.polydegreetype == "TOTAL": TM, _, argIdxs = pvTP(self._musUniqueCN, self.M, self.polybasis0, self._derIdxs, self._reorder, scl = np.power(self.scaleFactor, -1.)) TM = TM[:, argIdxs] else: #if self.polydegreetype == "FULL": TM = pvP(self._musUniqueCN, [self.M] * self.npar, self.polybasis0, self._derIdxs, self._reorder, scl = np.power(self.scaleFactor, -1.)) vbMng(self, "DEL", "Done computing denominator-related blocks.", 7) return TM def setupApprox(self): """ Compute rational interpolant. SVD-based robust eigenvalue management. """ if self.checkComputedApprox(): return RROMPyAssert(self._mode, message = "Cannot setup approximant.") vbMng(self, "INIT", "Setting up {}.". format(self.name()), 5) self.computeSnapshots() pMat = self.samplingEngine.samples.data - pMatEff = dot(self.HFEngine.C, pMat) if self.force_state else pMat + pMatEff = dot(self.HFEngine.C, pMat) if self.approx_state else pMat if self.trainedModel is None: self.trainedModel = self.tModelType() self.trainedModel.verbosity = self.verbosity self.trainedModel.timestamp = self.timestamp datadict = {"mu0": self.mu0, "projMat": pMatEff, "scaleFactor": self.scaleFactor, "rescalingExp": self.HFEngine.rescalingExp} data = self.initializeModelData(datadict)[0] data.POD = self.POD data.polybasis = self.polybasis data.polydegreetype = self.polydegreetype data.radialBasis = self.radialBasis data.radialWeights = self.radialDirectionalWeights data.nNearestNeighbor = self.nNearestNeighbor data.radialBasisDen = self.radialBasisDen data.radialWeightsDen = self.radialDirectionalWeightsDen data.nNearestNeighborDen = self.nNearestNeighborDen data.interpRcond = self.interpRcond self.trainedModel.data = data else: self.trainedModel = self.trainedModel self.trainedModel.data.projMat = copy(pMatEff) if not self.POD: self.trainedModel.data.gramian = self.HFEngine.innerProduct( - self.samplingEngine.samples, - self.samplingEngine.samples, - is_state = self.force_state) + self.samplingEngine.samples, + self.samplingEngine.samples, + is_state = self.approx_state) self.trainedModel.data.mus = copy(self.mus) self.trainedModel.data.M = self.M self.trainedModel.data.N = self.N QVan, self.trainedModel.data.QBlocks = self._setupDenominator() self.trainedModel.data.PVan = self._setupNumerator() if self.polydegreetype == "TOTAL": degreeMaxMask = totalDegreeMaxMask else: #if self.polydegreetype == "FULL": degreeMaxMask = fullDegreeMaxMask if self.N > self.M: self.trainedModel.data.QVan = QVan self.trainedModel.data.domQIdxs = degreeMaxMask(self.N, self.npar) else: self.trainedModel.data.domQIdxs = degreeMaxMask(self.M, self.npar) self.trainedModel.data.approxParameters = copy(self.approxParameters) vbMng(self, "DEL", "Done setting up approximant.", 5) diff --git a/rrompy/reduction_methods/standard/reduced_basis.py b/rrompy/reduction_methods/standard/reduced_basis.py index 8df607e..ec07db6 100644 --- a/rrompy/reduction_methods/standard/reduced_basis.py +++ b/rrompy/reduction_methods/standard/reduced_basis.py @@ -1,214 +1,214 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from copy import deepcopy as copy import numpy as np from .generic_standard_approximant import GenericStandardApproximant from rrompy.reduction_methods.base.reduced_basis_utils import \ projectAffineDecomposition from rrompy.utilities.base.types import (Np1D, Np2D, List, Tuple, DictAny, HFEng, paramVal, sampList) from rrompy.utilities.base import verbosityManager as vbMng from rrompy.utilities.numerical import dot from rrompy.utilities.exception_manager import (RROMPyWarning, RROMPyException, RROMPyAssert) __all__ = ['ReducedBasis'] class ReducedBasis(GenericStandardApproximant): """ ROM RB approximant computation for parametric problems. Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': whether to compute POD of snapshots; defaults to True; - 'S': total number of samples current approximant relies upon; - 'sampler': sample point generator; - 'R': rank for Galerkin projection; defaults to S; - 'PODTolerance': tolerance for snapshots POD; defaults to -1. Defaults to empty dict. - force_state(optional): Whether to approximate state. Defaults and must + approx_state(optional): Whether to approximate state. Defaults and must be True. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. mu0: Default parameter. mus: Array of snapshot parameters. approxRadius: Dummy radius of approximant (i.e. distance from mu0 to farthest sample point). approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList. parameterListSoft: Recognized keys of soft approximant parameters: - 'POD': whether to compute POD of snapshots. - 'R': rank for Galerkin projection; - 'PODTolerance': tolerance for snapshots POD. parameterListCritical: Recognized keys of critical approximant parameters: - 'S': total number of samples current approximant relies upon; - 'sampler': sample point generator; - force_state: Whether to approximate state. + approx_state: Whether to approximate state. verbosity: Verbosity level. POD: Whether to compute POD of snapshots. S: Number of solution snapshots over which current approximant is based upon. sampler: Sample point generator. R: Rank for Galerkin projection. muBounds: list of bounds for parameter values. samplingEngine: Sampling engine. uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as sampleList. lastSolvedHF: Parameter(s) corresponding to last computed high fidelity solution(s) as parameterList. uApproxReduced: Reduced approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApproxReduced: Parameter(s) corresponding to last computed reduced approximate solution(s) as parameterList. uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApprox: Parameter(s) corresponding to last computed approximate solution(s) as parameterList. As: List of sparse matrices (in CSC format) representing coefficients of linear system matrix. bs: List of numpy vectors representing coefficients of linear system RHS. ARBs: List of sparse matrices (in CSC format) representing coefficients of compressed linear system matrix. bRBs: List of numpy vectors representing coefficients of compressed linear system RHS. """ def __init__(self, HFEngine:HFEng, mu0 : paramVal = None, - approxParameters : DictAny = {}, force_state : bool = True, + approxParameters : DictAny = {}, approx_state : bool = True, verbosity : int = 10, timestamp : bool = True): - if not force_state: RROMPyWarning("Overriding force_state to True.") + if not approx_state: RROMPyWarning("Overriding approx_state to True.") self._preInit() self._addParametersToList(["R", "PODTolerance"], ["AUTO", -1]) super().__init__(HFEngine = HFEngine, mu0 = mu0, approxParameters = approxParameters, - force_state = True, verbosity = verbosity, + approx_state = True, verbosity = verbosity, timestamp = timestamp) self._postInit() @property def tModelType(self): from rrompy.reduction_methods.trained_model import \ TrainedModelReducedBasis return TrainedModelReducedBasis @property def R(self): """Value of R. Its assignment may change S.""" return self._R @R.setter def R(self, R): if R == "AUTO": if not hasattr(self, "_S"): raise RROMPyException(("Cannot assign R automatically without " "S.")) R = self.S if R < 0: raise RROMPyException("R must be non-negative.") self._R = R self._approxParameters["R"] = self.R @property def PODTolerance(self): """Value of PODTolerance.""" return self._PODTolerance @PODTolerance.setter def PODTolerance(self, PODTolerance): self._PODTolerance = PODTolerance self._approxParameters["PODTolerance"] = self.PODTolerance def _setupProjectionMatrix(self): """Compute projection matrix.""" RROMPyAssert(self._mode, message = "Cannot setup numerator.") vbMng(self, "INIT", "Starting computation of projection matrix.", 7) nsamples = self.samplingEngine.nsamples if self.R > nsamples: RROMPyWarning(("R too large compared to S. Reducing R by " "{}").format(self.R - nsamples)) self.R = nsamples if self.POD: U, s, _ = np.linalg.svd(self.samplingEngine.RPOD) s = s ** 2. else: Gramian = self.HFEngine.innerProduct(self.samplingEngine.samples, self.samplingEngine.samples, is_state = True) U, s, _ = np.linalg.svd(Gramian) snorm = np.cumsum(s[::-1]) / np.sum(s) nPODTrunc = min(nsamples - np.argmax(snorm > self.PODTolerance), self.R) pMat = dot(self.samplingEngine.samples, U[:, : nPODTrunc]) vbMng(self, "MAIN", ("Assembling {}x{} projection matrix from {} " "samples.").format(*(pMat.shape), nsamples), 5) vbMng(self, "DEL", "Done computing projection matrix.", 7) return pMat def setupApprox(self): """Compute RB projection matrix.""" if self.checkComputedApprox(): return RROMPyAssert(self._mode, message = "Cannot setup approximant.") vbMng(self, "INIT", "Setting up {}.". format(self.name()), 5) self.computeSnapshots() pMat = self._setupProjectionMatrix().data pMatEff = dot(self.HFEngine.C, pMat) if self.trainedModel is None: self.trainedModel = self.tModelType() self.trainedModel.verbosity = self.verbosity self.trainedModel.timestamp = self.timestamp datadict = {"mu0": self.mu0, "projMat": pMatEff, "scaleFactor": self.scaleFactor, "rescalingExp": self.HFEngine.rescalingExp} data = self.initializeModelData(datadict)[0] data.affinePoly = self.HFEngine.affinePoly data.thAs, data.thbs = self.HFEngine.thAs, self.HFEngine.thbs self.trainedModel.data = data else: self.trainedModel = self.trainedModel self.trainedModel.data.projMat = copy(pMatEff) self.trainedModel.data.mus = copy(self.mus) ARBs, bRBs = self.assembleReducedSystem(pMat) self.trainedModel.data.ARBs = ARBs self.trainedModel.data.bRBs = bRBs self.trainedModel.data.approxParameters = copy(self.approxParameters) vbMng(self, "DEL", "Done setting up approximant.", 5) def assembleReducedSystem(self, pMat : sampList = None, pMatOld : sampList = None)\ -> Tuple[List[Np2D], List[Np1D]]: """Build affine blocks of RB linear system through projections.""" if pMat is None: self.setupApprox() ARBs = self.trainedModel.data.ARBs bRBs = self.trainedModel.data.bRBs else: vbMng(self, "INIT", "Projecting affine terms of HF model.", 10) ARBsOld = None if pMatOld is None else self.trainedModel.data.ARBs bRBsOld = None if pMatOld is None else self.trainedModel.data.bRBs ARBs, bRBs = projectAffineDecomposition(self.HFEngine.As, self.HFEngine.bs, pMat, ARBsOld, bRBsOld, pMatOld) vbMng(self, "DEL", "Done projecting affine terms.", 10) return ARBs, bRBs diff --git a/rrompy/sampling/base/sampling_engine_base.py b/rrompy/sampling/base/sampling_engine_base.py index 63a25fb..d34c933 100644 --- a/rrompy/sampling/base/sampling_engine_base.py +++ b/rrompy/sampling/base/sampling_engine_base.py @@ -1,206 +1,206 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import numpy as np from rrompy.utilities.base.types import (Np1D, HFEng, List, strLst, paramVal, paramList, sampList) from rrompy.utilities.base import verbosityManager as vbMng from rrompy.utilities.exception_manager import RROMPyWarning from rrompy.parameter import (emptyParameterList, checkParameter, checkParameterList) from rrompy.sampling import emptySampleList __all__ = ['SamplingEngineBase'] class SamplingEngineBase: """HERE""" - def __init__(self, HFEngine:HFEng, force_state : bool = False, + def __init__(self, HFEngine:HFEng, sample_state : bool = False, verbosity : int = 10, timestamp : bool = True): - self.force_state = force_state + self.sample_state = sample_state self.verbosity = verbosity self.timestamp = timestamp vbMng(self, "INIT", "Initializing sampling engine of type {}.".format(self.name()), 10) self.HFEngine = HFEngine vbMng(self, "DEL", "Done initializing sampling engine.", 10) def name(self) -> str: return self.__class__.__name__ def __str__(self) -> str: return self.name() def __repr__(self) -> str: return self.__str__() + " at " + hex(id(self)) def resetHistory(self): self.samples = emptySampleList() self.nsamples = 0 self.mus = emptyParameterList() self._derIdxs = [] def popSample(self): if hasattr(self, "nsamples") and self.nsamples > 1: if self.samples.shape[1] > self.nsamples: RROMPyWarning(("More than 'nsamples' memory allocated for " "samples. Popping empty sample column.")) self.nsamples += 1 self.nsamples -= 1 self.samples.pop() self.mus.pop() else: self.resetHistory() def preallocateSamples(self, u:sampList, mu:paramVal, n:int): self.samples.reset((u.shape[0], n), u.dtype) self.samples[0] = u mu = checkParameter(mu, self.HFEngine.npar) self.mus.reset((n, self.HFEngine.npar)) self.mus[0] = mu[0] @property def HFEngine(self): """Value of HFEngine. Its assignment resets history.""" return self._HFEngine @HFEngine.setter def HFEngine(self, HFEngine): self._HFEngine = HFEngine self.resetHistory() def solveLS(self, mu : paramList = [], RHS : sampList = None) -> sampList: """ Solve linear system. Args: mu: Parameter value. Returns: Solution of system. """ mu = checkParameterList(mu, self.HFEngine.npar)[0] vbMng(self, "INIT", "Solving HF model for mu = {}.".format(mu), 15) - u = self.HFEngine.solve(mu, RHS, force_state = self.force_state, + u = self.HFEngine.solve(mu, RHS, return_state = self.sample_state, verbose = (self.verbosity >= 20)) vbMng(self, "DEL", "Done solving HF model.", 15) return u def plotSamples(self, warping : List[callable] = None, name : str = "u", save : str = None, what : strLst = 'all', saveFormat : str = "eps", saveDPI : int = 100, show : bool = True, plotArgs : dict = {}, **figspecs) -> List[str]: """ Do some nice plots of the samples. Args: warping(optional): Domain warping functions. name(optional): Name to be shown as title of the plots. Defaults to 'u'. save(optional): Where to save plot(s). Defaults to None, i.e. no saving. what(optional): Which plots to do. If list, can contain 'ABS', 'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard 'ALL'. Defaults to 'ALL'. saveFormat(optional): Format for saved plot(s). Defaults to "eps". saveDPI(optional): DPI for saved plot(s). Defaults to 100. show(optional): Whether to show figure. Defaults to True. plotArgs(optional): Optional arguments for fen/pyplot. figspecs(optional key args): Optional arguments for matplotlib figure creation. Returns: Output filenames. """ filesOut = [None] * self.nsamples for j in range(self.nsamples): filesOut[j] = self.HFEngine.plot(self.samples[j], warping, False, "{}_{}".format(name, j), save, what, saveFormat, saveDPI, show, plotArgs, **figspecs) if filesOut[0] is None: return None return filesOut def outParaviewSamples(self, name : str = "u", folders : bool = True, filename : str = "out", times : Np1D = None, what : strLst = 'all', forceNewFile : bool = True, filePW = None) -> List[str]: """ Output samples to ParaView file. Args: name(optional): Base name to be used for data output. folders(optional): Whether to split output in folders. filename(optional): Name of output file. times(optional): Timestamps. what(optional): Which plots to do. If list, can contain 'MESH', 'ABS', 'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard 'ALL'. Defaults to 'ALL'. forceNewFile(optional): Whether to create new output file. filePW(optional): Fenics File entity (for time series). Returns: Output filenames. """ if times is None: times = [0.] * self.nsamples filesOut = [None] * self.nsamples for j in range(self.nsamples): filesOut[j] = self.HFEngine.outParaview(self.samples[j], None, False, "{}_{}".format(name, j), "{}_{}".format(filename, j), times[j], what, forceNewFile, folders, filePW) if filesOut[0] is None: return None return filesOut def outParaviewTimeDomainSamples(self, omegas : Np1D = None, timeFinal : Np1D = None, periodResolution : int = 20, name : str = "u", folders : bool = True, filename : str = "out", forceNewFile : bool = True) -> List[str]: """ Output samples to ParaView file, converted to time domain. Args: omegas(optional): frequencies. timeFinal(optional): final time of simulation. periodResolution(optional): number of time steps per period. name(optional): Base name to be used for data output. folders(optional): Whether to split output in folders. filename(optional): Name of output file. forceNewFile(optional): Whether to create new output file. Returns: Output filename. """ if omegas is None: omegas = np.real(self.mus) if not isinstance(timeFinal, (list, tuple,)): timeFinal = [timeFinal] * self.nsamples filesOut = [None] * self.nsamples for j in range(self.nsamples): filesOut[j] = self.HFEngine.outParaviewTimeDomain(self.samples[j], omegas[j], None, False, timeFinal[j], periodResolution, "{}_{}".format(name, j), "{}_{}".format(filename, j), forceNewFile, folders) if filesOut[0] is None: return None return filesOut diff --git a/rrompy/sampling/base/sampling_engine_base_pivoted.py b/rrompy/sampling/base/sampling_engine_base_pivoted.py index b5358b5..2df5180 100644 --- a/rrompy/sampling/base/sampling_engine_base_pivoted.py +++ b/rrompy/sampling/base/sampling_engine_base_pivoted.py @@ -1,233 +1,233 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import numpy as np from rrompy.utilities.base.types import (Np1D, HFEng, List, ListAny, strLst, paramVal, paramList, sampList) from rrompy.utilities.base import verbosityManager as vbMng from rrompy.utilities.exception_manager import RROMPyWarning from rrompy.parameter import (emptyParameterList, checkParameter, checkParameterList) from rrompy.sampling import emptySampleList from .sampling_engine_base import SamplingEngineBase __all__ = ['SamplingEngineBasePivoted'] class SamplingEngineBasePivoted(SamplingEngineBase): """HERE""" def __init__(self, HFEngine:HFEng, directionPivot:ListAny, - force_state : bool = False, verbosity : int = 10, + sample_state : bool = False, verbosity : int = 10, timestamp : bool = True): - super().__init__(HFEngine, force_state, verbosity, timestamp) + super().__init__(HFEngine, sample_state, verbosity, timestamp) self.directionPivot = directionPivot self.HFEngineMarginalized = None self.resetHistory() @property def directionMarginal(self): return tuple([x for x in range(self.HFEngine.npar) \ if x not in self.directionPivot]) @property def nPivot(self): return len(self.directionPivot) @property def nMarginal(self): return len(self.directionMarginal) @property def nsamplesTot(self): return np.sum(self.nsamples) def resetHistory(self, j : int = 1): self.samples = [emptySampleList() for _ in range(j)] self.nsamples = [0] * j self.mus = [emptyParameterList() for _ in range(j)] self._derIdxs = [[] for _ in range(j)] def popSample(self, j:int): if hasattr(self, "nsamples") and self.nsamples[j] > 1: if self.samples[j].shape[1] > self.nsamples[j]: RROMPyWarning(("More than 'nsamples' memory allocated for " "samples. Popping empty sample column.")) self.nsamples[j] += 1 self.nsamples[j] -= 1 self.samples[j].pop() self.mus[j].pop() else: self.resetHistory() def preallocateSamples(self, u:sampList, mu:paramVal, n:int, j:int): self.samples[j].reset((u.shape[0], n), u.dtype) self.samples[j][0] = u mu = checkParameter(mu, self.nPivot) self.mus[j].reset((n, self.nPivot)) self.mus[j][0] = mu[0] def coalesceSamples(self): self.samplesCoalesced = emptySampleList() self.samplesCoalesced.reset((self.samples[0].shape[0], np.sum([samp.shape[1] \ for samp in self.samples])), self.samples[0].dtype) run_idx = 0 for samp in self.samples: slen = samp.shape[1] self.samplesCoalesced.data[:, run_idx : run_idx + slen] = samp.data run_idx += slen def solveLS(self, mu : paramList = [], RHS : sampList = None) -> sampList: """ Solve linear system. Args: mu: Parameter value. Returns: Solution of system. """ mu = checkParameterList(mu, self.nPivot)[0] vbMng(self, "INIT", ("Solving HF model for muPivot = {} and muMarginal = " "{}.").format(mu, self.HFEngineMarginalized.muFixed), 15) u = self.HFEngineMarginalized.solve(mu, RHS, - force_state = self.force_state, + return_state = self.sample_state, verbose = (self.verbosity >= 20)) vbMng(self, "DEL", "Done solving HF model.", 15) return u def plotSamples(self, warping : List[callable] = None, name : str = "u", save : str = None, what : strLst = 'all', saveFormat : str = "eps", saveDPI : int = 100, show : bool = True, plotArgs : dict = {}, **figspecs) -> List[str]: """ Do some nice plots of the samples. Args: warping(optional): Domain warping functions. name(optional): Name to be shown as title of the plots. Defaults to 'u'. save(optional): Where to save plot(s). Defaults to None, i.e. no saving. what(optional): Which plots to do. If list, can contain 'ABS', 'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard 'ALL'. Defaults to 'ALL'. saveFormat(optional): Format for saved plot(s). Defaults to "eps". saveDPI(optional): DPI for saved plot(s). Defaults to 100. show(optional): Whether to show figure. Defaults to True. plotArgs(optional): Optional arguments for fen/pyplot. figspecs(optional key args): Optional arguments for matplotlib figure creation. Returns: Output filenames. """ filesOut = [] for i in range(len(self.nsamples)): filesOuti = [None] * self.nsamples[i] for j in range(self.nsamples[i]): filesOuti[j] = self.HFEngine.plot(self.samples[i][j], warping, False, "{}_{}_{}".format(name, i, j), save, what, saveFormat, saveDPI, show, plotArgs, **figspecs) filesOut += [filesOuti] if filesOut[0][0] is None: return None return filesOut def outParaviewSamples(self, name : str = "u", folders : bool = True, filename : str = "out", times : Np1D = None, what : strLst = 'all', forceNewFile : bool = True, filePW = None) -> List[str]: """ Output samples to ParaView file. Args: name(optional): Base name to be used for data output. folders(optional): Whether to split output in folders. filename(optional): Name of output file. times(optional): Timestamps. what(optional): Which plots to do. If list, can contain 'MESH', 'ABS', 'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard 'ALL'. Defaults to 'ALL'. forceNewFile(optional): Whether to create new output file. filePW(optional): Fenics File entity (for time series). Returns: Output filenames. """ if times is None: times = [[0.] * self.nsamples[i] \ for i in range(len(self.nsamples))] filesOut = [] for i in range(len(self.nsamples)): filesOuti = [None] * self.nsamples[i] for j in range(self.nsamples[i]): filesOuti[j] = self.HFEngine.outParaview( self.samples[i][j], None, False, "{}_{}_{}".format(name, i, j), "{}_{}_{}".format(filename, i, j), times[i][j], what, forceNewFile, folders, filePW) filesOut += [filesOuti] if filesOut[0][0] is None: return None return filesOut def outParaviewTimeDomainSamples(self, omegas : Np1D = None, timeFinal : Np1D = None, periodResolution : int = 20, name : str = "u", folders : bool = True, filename : str = "out", forceNewFile : bool = True) -> List[str]: """ Output samples to ParaView file, converted to time domain. Args: omegas(optional): frequencies. timeFinal(optional): final time of simulation. periodResolution(optional): number of time steps per period. name(optional): Base name to be used for data output. folders(optional): Whether to split output in folders. filename(optional): Name of output file. forceNewFile(optional): Whether to create new output file. Returns: Output filenames. """ if omegas is None: omegas = [[np.real(self.mus[i])] \ for i in range(len(self.nsamples))] if not isinstance(timeFinal, (list, tuple,)): timeFinal = [[timeFinal] * self.nsamples[i] \ for i in range(len(self.nsamples))] filesOut = [] for i in range(len(self.nsamples)): filesOuti = [None] * self.nsamples[i] for j in range(self.nsamples[i]): filesOuti[j] = self.HFEngine.outParaviewTimeDomain( self.samples[i][j], omegas[i][j], None, False, timeFinal[i][j], periodResolution, "{}_{}_{}".format(name, i, j), "{}_{}_{}".format(filename, i, j), forceNewFile, folders) filesOut += [filesOuti] if filesOut[0][0] is None: return None return filesOut diff --git a/rrompy/sampling/pivoted/sampling_engine_pivoted.py b/rrompy/sampling/pivoted/sampling_engine_pivoted.py index bc2012c..343fa3c 100644 --- a/rrompy/sampling/pivoted/sampling_engine_pivoted.py +++ b/rrompy/sampling/pivoted/sampling_engine_pivoted.py @@ -1,131 +1,131 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from copy import deepcopy as copy import numpy as np from rrompy.sampling.base.sampling_engine_base_pivoted import ( SamplingEngineBasePivoted) from rrompy.hfengines.base import MarginalProxyEngine from rrompy.utilities.base.types import Np1D, paramVal, paramList, sampList from rrompy.utilities.base import verbosityManager as vbMng, freepar as fp from rrompy.utilities.exception_manager import RROMPyException from rrompy.utilities.numerical import nextDerivativeIndices, dot from rrompy.parameter import checkParameter, checkParameterList from rrompy.sampling import sampleList __all__ = ['SamplingEnginePivoted'] class SamplingEnginePivoted(SamplingEngineBasePivoted): """HERE""" def preprocesssamples(self, idxs:Np1D, j:int) -> sampList: if self.samples[j] is None or len(self.samples[j]) == 0: return return self.samples[j](idxs) def setsample(self, u:sampList, j:int, overwrite : bool = False) -> Np1D: if overwrite: self.samples[j][self.nsamples[j]] = u else: if self.nsamples[j] == 0: self.samples[j] = sampleList(u) else: self.samples[j].append(u) def postprocessu(self, u:sampList, j:int, overwrite : bool = False): self.setsample(u, j, overwrite) def postprocessuBulk(self, j:int): pass def _getSampleConcurrence(self, mu:paramVal, j:int, previous:Np1D) -> sampList: - if not (self.force_state or self.HFEngine.isCEye): + if not (self.sample_state or self.HFEngine.isCEye): raise RROMPyException(("Derivatives of solution with non-scalar " "C not computable.")) if not self.HFEngine._isStateShiftZero: raise RROMPyException(("Derivatives of solution with non-zero " "solution shift not computable.")) if len(previous) >= len(self._derIdxs[j]): self._derIdxs[j] += nextDerivativeIndices( self._derIdxs[j], self.nPivot, len(previous) + 1 - len(self._derIdxs[j])) derIdx = self._derIdxs[j][len(previous)] mu = checkParameter(mu, self.nPivot) samplesOld = self.preprocesssamples(previous, j) RHS = self.HFEngineMarginalized.b(mu, derIdx) for j, derP in enumerate(self._derIdxs[j][: len(previous)]): diffP = [x - y for (x, y) in zip(derIdx, derP)] if np.all([x >= 0 for x in diffP]): RHS -= dot(self.HFEngineMarginalized.A(mu, diffP), samplesOld[j]) return self.solveLS(mu, RHS = RHS) def nextSample(self, mu:paramVal, j:int, overwrite : bool = False, postprocess : bool = True) -> Np1D: mu = checkParameter(mu, self.nPivot) muidxs = self.mus[j].findall(mu[0]) if len(muidxs) > 0: u = self._getSampleConcurrence(mu, j, np.sort(muidxs)) else: u = self.solveLS(mu) if postprocess: self.postprocessu(u, j, overwrite = overwrite) else: self.setsample(u, j, overwrite) if overwrite: self.mus[j][self.nsamples[j]] = mu[0] else: self.mus[j].append(mu) self.nsamples[j] += 1 return self.samples[j][self.nsamples[j] - 1] def iterSample(self, mus:paramList, musM:paramList) -> sampList: mus = checkParameterList(mus, self.nPivot)[0] musM = checkParameterList(musM, self.nMarginal)[0] vbMng(self, "INIT", "Starting sampling iterations.", 5) n = len(mus) m = len(musM) if n <= 0: raise RROMPyException("Number of samples must be positive.") if m <= 0: raise RROMPyException(("Number of marginal samples must be " "positive.")) repeatedSamples = len(mus.unique()) != n for j in range(m): muMEff = [fp] * self.HFEngine.npar for k, x in enumerate(self.directionMarginal): muMEff[x] = musM(j, k) self.HFEngineMarginalized = MarginalProxyEngine(self.HFEngine, list(muMEff)) if repeatedSamples: for k in range(n): vbMng(self, "MAIN", "Computing sample {} / {} for marginal {} / {}."\ .format(k + 1, n, j, m), 10) self.nextSample(mus[k], j, overwrite = (k > 0), postprocess = False) if n > 1 and k == 0: self.preallocateSamples(self.samples[j][0], mus[0], n, j) else: self.samples[j] = self.postprocessuBulk(self.solveLS(mus), j) self.mus[j] = copy(mus) self.nsamples[j] = n self.postprocessuBulk(j) vbMng(self, "DEL", "Finished sampling iterations.", 5) return self.samples[j] diff --git a/rrompy/sampling/pivoted/sampling_engine_pivoted_pod.py b/rrompy/sampling/pivoted/sampling_engine_pivoted_pod.py index 674c987..815b033 100644 --- a/rrompy/sampling/pivoted/sampling_engine_pivoted_pod.py +++ b/rrompy/sampling/pivoted/sampling_engine_pivoted_pod.py @@ -1,124 +1,124 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import numpy as np from rrompy.sampling.base.pod_engine import PODEngine from .sampling_engine_pivoted import SamplingEnginePivoted from rrompy.utilities.base.types import Np1D, paramVal, sampList from rrompy.utilities.base import verbosityManager as vbMng from rrompy.sampling import sampleList, emptySampleList __all__ = ['SamplingEnginePivotedPOD'] class SamplingEnginePivotedPOD(SamplingEnginePivoted): """HERE""" def resetHistory(self, j : int = 1): super().resetHistory(j) self.samples_full = [emptySampleList() for _ in range(j)] self.RPOD = [np.zeros((0, 0), dtype = np.complex) for _ in range(j)] def popSample(self, j:int): if hasattr(self, "nsamples") and self.nsamples[j] > 1: self.RPOD[j] = self.RPOD[j][: -1, : -1] self.samples_full[j].pop() super().popSample(j) def coalesceSamples(self, tol : float = 1e-12): super().coalesceSamples() self.samplesCoalesced, RPODC = self.PODEngine.generalizedQR( - self.samplesCoalesced, - is_state = self.force_state) + self.samplesCoalesced, + is_state = self.sample_state) self.RPODCoalesced = np.zeros((self.samplesCoalesced.shape[1], self.samplesCoalesced.shape[1]), dtype = self.RPOD[0].dtype) self.samples_fullCoalesced = emptySampleList() self.samples_fullCoalesced.reset((self.samples_full[0].shape[0], self.samplesCoalesced.shape[1]), self.samples_full[0].dtype) ci = 0 for j, (Rloc, samp) in enumerate(zip(self.RPOD, self.samples_full)): ri = 0 Rheg = Rloc.shape[1] for k, Rloc2 in enumerate(self.RPOD[: j + 1]): Rlen = Rloc2.shape[1] self.RPODCoalesced[ri : ri + Rlen, ci : ci + Rheg] = ( RPODC[ri : ri + Rlen, ci : ci + Rheg].dot(Rloc)) ri += Rlen self.samples_fullCoalesced.data[:, ci : ci + Rheg] = samp.data ci += Rheg RCdiag = np.abs(np.diag(self.RPODCoalesced)) RCdiag /= RCdiag[0] ntrunc = np.nonzero(RCdiag < tol)[0] if len(ntrunc) == 0: return self.samplesCoalesced.data = self.samplesCoalesced.data[:, : ntrunc[0]] self.RPODCoalesced = self.RPODCoalesced[: ntrunc[0], :] @property def HFEngine(self): """Value of HFEngine. Its assignment resets history.""" return self._HFEngine @HFEngine.setter def HFEngine(self, HFEngine): self._HFEngine = HFEngine self.resetHistory() self.PODEngine = PODEngine(self._HFEngine) def preprocesssamples(self, idxs:Np1D, j:int) -> sampList: if self.samples_full[j] is None or len(self.samples_full[j]) == 0: return return self.samples_full[j](idxs) def setsample(self, u:sampList, j:int, overwrite : bool = False): super().setsample(u, j, overwrite) if overwrite: self.samples_full[j][self.nsamples[j]] = u else: if self.nsamples[j] == 0: self.samples_full[j] = sampleList(u) else: self.samples_full[j].append(u) def postprocessu(self, u:sampList, j:int, overwrite : bool = False): if overwrite: self.samples_full[j][self.nsamples[j]] = u else: if self.nsamples[j] == 0: self.samples_full[j] = sampleList(u) else: self.samples_full[j].append(u) vbMng(self, "INIT", "Starting orthogonalization.", 20) u, r, _ = self.PODEngine.GS(u, self.samples[j], - is_state = self.force_state) + is_state = self.sample_state) self.RPOD[j] = np.pad(self.RPOD[j], ((0, 1), (0, 1)), 'constant') self.RPOD[j][:, -1] = r vbMng(self, "DEL", "Done orthogonalizing.", 20) super().setsample(u, j, overwrite) def postprocessuBulk(self, j:int): vbMng(self, "INIT", "Starting orthogonalization for marginal no {}.".format(j), 40) u, self.RPOD[j] = self.PODEngine.generalizedQR(self.samples_full[j], - is_state = self.force_state) + is_state = self.sample_state) vbMng(self, "DEL", "Done orthogonalizing.", 40) self.samples[j] = sampleList(u) def preallocateSamples(self, u:Np1D, mu:paramVal, n:int, j:int): super().preallocateSamples(u, mu, n, j) self.samples_full[j].reset((u.shape[0], n), u.dtype) self.samples_full[j][0] = u diff --git a/rrompy/sampling/standard/sampling_engine_standard.py b/rrompy/sampling/standard/sampling_engine_standard.py index d6c0c83..82a08f1 100644 --- a/rrompy/sampling/standard/sampling_engine_standard.py +++ b/rrompy/sampling/standard/sampling_engine_standard.py @@ -1,115 +1,115 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from copy import deepcopy as copy import numpy as np from rrompy.sampling.base.sampling_engine_base import SamplingEngineBase from rrompy.utilities.base.types import Np1D, paramVal, paramList, sampList from rrompy.utilities.base import verbosityManager as vbMng from rrompy.utilities.exception_manager import RROMPyException from rrompy.utilities.numerical import nextDerivativeIndices, dot from rrompy.parameter import checkParameter, checkParameterList from rrompy.sampling import sampleList __all__ = ['SamplingEngineStandard'] class SamplingEngineStandard(SamplingEngineBase): """HERE""" def preprocesssamples(self, idxs:Np1D) -> sampList: if self.samples is None or len(self.samples) == 0: return return self.samples(idxs) def setsample(self, u:sampList, overwrite : bool = False): if overwrite: self.samples[self.nsamples] = u else: if self.nsamples == 0: self.samples = sampleList(u) else: self.samples.append(u) def postprocessu(self, u:sampList, overwrite : bool = False): self.setsample(u, overwrite) def postprocessuBulk(self): pass def _getSampleConcurrence(self, mu:paramVal, previous:Np1D) -> sampList: - if not (self.force_state or self.HFEngine.isCEye): + if not (self.sample_state or self.HFEngine.isCEye): raise RROMPyException(("Derivatives of solution with non-scalar " "C not computable.")) if not self.HFEngine._isStateShiftZero: raise RROMPyException(("Derivatives of solution with non-zero " "solution shift not computable.")) if len(previous) >= len(self._derIdxs): self._derIdxs += nextDerivativeIndices(self._derIdxs, self.HFEngine.npar, len(previous) + 1 - len(self._derIdxs)) derIdx = self._derIdxs[len(previous)] mu = checkParameter(mu, self.HFEngine.npar) samplesOld = self.preprocesssamples(previous) RHS = self.HFEngine.b(mu, derIdx) for j, derP in enumerate(self._derIdxs[: len(previous)]): diffP = [x - y for (x, y) in zip(derIdx, derP)] if np.all([x >= 0 for x in diffP]): RHS -= dot(self.HFEngine.A(mu, diffP), samplesOld[j]) return self.solveLS(mu, RHS = RHS) def nextSample(self, mu : paramVal = [], overwrite : bool = False, postprocess : bool = True) -> Np1D: mu = checkParameter(mu, self.HFEngine.npar) muidxs = self.mus.findall(mu[0]) if len(muidxs) > 0: u = self._getSampleConcurrence(mu, np.sort(muidxs)) else: u = self.solveLS(mu) if postprocess: self.postprocessu(u, overwrite = overwrite) else: self.setsample(u, overwrite) if overwrite: self.mus[self.nsamples] = mu[0] else: self.mus.append(mu) self.nsamples += 1 return self.samples[self.nsamples - 1] def iterSample(self, mus:paramList) -> sampList: mus = checkParameterList(mus, self.HFEngine.npar)[0] vbMng(self, "INIT", "Starting sampling iterations.", 5) n = len(mus) if n <= 0: raise RROMPyException(("Number of samples must be positive.")) self.resetHistory() if len(mus.unique()) != n: for j in range(n): vbMng(self, "MAIN", "Computing sample {} / {}.".format(j + 1, n), 7) self.nextSample(mus[j], overwrite = (j > 0), postprocess = False) if n > 1 and j == 0: self.preallocateSamples(self.samples[0], mus[0], n) else: self.setsample(self.solveLS(mus), overwrite = False) self.mus = copy(mus) self.nsamples = n self.postprocessuBulk() vbMng(self, "DEL", "Finished sampling iterations.", 5) return self.samples diff --git a/rrompy/sampling/standard/sampling_engine_standard_pod.py b/rrompy/sampling/standard/sampling_engine_standard_pod.py index 3297fce..3c9a3b9 100644 --- a/rrompy/sampling/standard/sampling_engine_standard_pod.py +++ b/rrompy/sampling/standard/sampling_engine_standard_pod.py @@ -1,91 +1,91 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import numpy as np from rrompy.sampling.base.pod_engine import PODEngine from .sampling_engine_standard import SamplingEngineStandard from rrompy.utilities.base.types import Np1D, paramVal, sampList from rrompy.utilities.base import verbosityManager as vbMng from rrompy.sampling import sampleList, emptySampleList __all__ = ['SamplingEngineStandardPOD'] class SamplingEngineStandardPOD(SamplingEngineStandard): """HERE""" def resetHistory(self): super().resetHistory() self.samples_full = emptySampleList() self.RPOD = np.zeros((0, 0), dtype = np.complex) def popSample(self): if hasattr(self, "nsamples") and self.nsamples > 1: self.RPOD = self.RPOD[: -1, : -1] self.samples_full.pop() super().popSample() @property def HFEngine(self): """Value of HFEngine. Its assignment resets history.""" return self._HFEngine @HFEngine.setter def HFEngine(self, HFEngine): SamplingEngineStandard.HFEngine.fset(self, HFEngine) self.PODEngine = PODEngine(self._HFEngine) def preprocesssamples(self, idxs:Np1D) -> sampList: if self.samples_full is None or len(self.samples_full) == 0: return return self.samples_full(idxs) def setsample(self, u:sampList, overwrite : bool = False): super().setsample(u, overwrite) if overwrite: self.samples_full[self.nsamples] = u else: if self.nsamples == 0: self.samples_full = sampleList(u) else: self.samples_full.append(u) def postprocessu(self, u:sampList, overwrite : bool = False): if overwrite: self.samples_full[self.nsamples] = u else: if self.nsamples == 0: self.samples_full = sampleList(u) else: self.samples_full.append(u) vbMng(self, "INIT", "Starting orthogonalization.", 20) u, r, _ = self.PODEngine.GS(u, self.samples, - is_state = self.force_state) + is_state = self.sample_state) self.RPOD = np.pad(self.RPOD, ((0, 1), (0, 1)), 'constant') self.RPOD[:, -1] = r vbMng(self, "DEL", "Done orthogonalizing.", 20) super().setsample(u, overwrite) def postprocessuBulk(self): vbMng(self, "INIT", "Starting orthogonalization.", 10) u, self.RPOD = self.PODEngine.generalizedQR(self.samples_full, - is_state = self.force_state) + is_state = self.sample_state) vbMng(self, "DEL", "Done orthogonalizing.", 10) self.samples = sampleList(u) def preallocateSamples(self, u:Np1D, mu:paramVal, n:int): super().preallocateSamples(u, mu, n) self.samples_full.reset((u.shape[0], n), u.dtype) self.samples_full[0] = u