diff --git a/VERSION b/VERSION index b123147..8cfbc90 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.1 \ No newline at end of file +1.1.1 \ No newline at end of file diff --git a/rrompy/hfengines/base/boundary_conditions.py b/rrompy/hfengines/base/boundary_conditions.py index 77b1ad5..fffe262 100644 --- a/rrompy/hfengines/base/boundary_conditions.py +++ b/rrompy/hfengines/base/boundary_conditions.py @@ -1,126 +1,126 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from fenics import SubDomain, AutoSubDomain from rrompy.utilities.base.types import GenExpr from rrompy.utilities.base.fenics import bdrFalse from rrompy.utilities.exception_manager import RROMPyException __all__ = ['BoundaryConditions'] class BoundaryConditions: """ Boundary conditions manager. Attributes: DirichletBoundary: Callable returning True when on Dirichlet boundary. NeumannBoundary: Callable returning True when on Neumann boundary. RobinBoundary: Callable returning True when on Robin boundary. """ allowedKinds = ["Dirichlet", "Neumann", "Robin"] def __init__(self, kind : str = None): if kind is None: return kind = kind[0].upper() + kind[1:].lower() if kind in self.allowedKinds: getattr(self.__class__, kind + "Boundary", None).fset(self, "ALL") else: raise RROMPyException("BC kind not recognized.") def name(self) -> str: return self.__class__.__name__ def __str__(self) -> str: return self.name() def __repr__(self) -> str: return self.__str__() + " at " + hex(id(self)) - def __generalManagement(self, kind:str, value:GenExpr): + def _generalManagement(self, kind:str, value:GenExpr): if isinstance(value, (str,)): value = value.upper() if value.upper() == "ALL": - self.__complementaryManagementAll(kind) + self._complementaryManagementAll(kind) elif value.upper() == "REST": - self.__complementaryManagementRest(kind) + self._complementaryManagementRest(kind) else: raise RROMPyException("Wildcard not recognized.") elif callable(value): - self.__standardManagementCallable(kind, value) + self._standardManagementCallable(kind, value) elif isinstance(value, (SubDomain,)): - self.__standardManagement(kind, value) + self._standardManagement(kind, value) else: raise RROMPyException(kind + "Boundary type not recognized.") - def __complementaryManagementAll(self, kind:str): + def _complementaryManagementAll(self, kind:str): if kind not in self.allowedKinds: raise RROMPyException("BC kind not recognized.") for k in self.allowedKinds: if k != kind: - self.__standardManagementCallable(k, bdrFalse) - self.__complementaryManagementRest(kind) + self._standardManagementCallable(k, bdrFalse) + self._complementaryManagementRest(kind) - def __complementaryManagementRest(self, kind:str): + def _complementaryManagementRest(self, kind:str): if kind not in self.allowedKinds: raise RROMPyException("BC kind not recognized.") otherBCs = [] for k in self.allowedKinds: if k != kind: if hasattr(self, "_" + k + "Rest"): - self.__standardManagement(k, bdrFalse) + self._standardManagement(k, bdrFalse) otherBCs += [getattr(self, k + "Boundary")] def restCall(x, on_boundary): return (on_boundary and not any([bc.inside(x, on_boundary) for bc in otherBCs])) - self.__standardManagementCallable(kind, restCall) + self._standardManagementCallable(kind, restCall) super().__setattr__("_" + kind + "Rest", 1) - def __standardManagementCallable(self, kind:str, bc:callable): + def _standardManagementCallable(self, kind:str, bc:callable): bcSD = AutoSubDomain(bc) - self.__standardManagement(kind, bcSD) + self._standardManagement(kind, bcSD) - def __standardManagement(self, kind:str, bc:SubDomain): + def _standardManagement(self, kind:str, bc:SubDomain): super().__setattr__("_" + kind + "Boundary", bc) if hasattr(self, "_" + kind + "Rest"): super().__delattr__("_" + kind + "Rest") @property def DirichletBoundary(self): """Function handle to DirichletBoundary.""" return self._DirichletBoundary @DirichletBoundary.setter def DirichletBoundary(self, DirichletBoundary): - self.__generalManagement("Dirichlet", DirichletBoundary) + self._generalManagement("Dirichlet", DirichletBoundary) @property def NeumannBoundary(self): """Function handle to NeumannBoundary.""" return self._NeumannBoundary @NeumannBoundary.setter def NeumannBoundary(self, NeumannBoundary): - self.__generalManagement("Neumann", NeumannBoundary) + self._generalManagement("Neumann", NeumannBoundary) @property def RobinBoundary(self): """Function handle to RobinBoundary.""" return self._RobinBoundary @RobinBoundary.setter def RobinBoundary(self, RobinBoundary): - self.__generalManagement("Robin", RobinBoundary) + self._generalManagement("Robin", RobinBoundary) diff --git a/rrompy/reduction_methods/base/generic_approximant.py b/rrompy/reduction_methods/base/generic_approximant.py index b74369f..ac6b75e 100644 --- a/rrompy/reduction_methods/base/generic_approximant.py +++ b/rrompy/reduction_methods/base/generic_approximant.py @@ -1,616 +1,616 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from abc import abstractmethod import pickle import numpy as np from itertools import product as iterprod from copy import deepcopy as copy from rrompy.sampling.base.sampling_engine_base import SamplingEngineBase from rrompy.utilities.base.types import Np1D, DictAny, HFEng, sampleEng, strLst from rrompy.utilities.base import purgeDict, verbosityDepth, getNewFilename from rrompy.utilities.exception_manager import (RROMPyException, modeAssert, RROMPy_READY, RROMPy_FRAGILE) __all__ = ['GenericApproximant'] def addNormFieldToClass(self, fieldName): def objFunc(self, mu:complex, homogeneized : bool = False) -> float: getObj = getattr(self.__class__, "get" + fieldName) return self.HFEngine.norm(getObj(self, mu, homogeneized)) setattr(self.__class__, "norm" + fieldName, objFunc) def addPlotFieldToClass(self, fieldName): def objFunc(self, mu:complex, name : str = fieldName, save : str = None, what : strLst = 'all', saveFormat : str = "eps", saveDPI : int = 100, show : bool = True, homogeneized : bool = False, **figspecs): uV = getattr(self.__class__, "get" + fieldName)(self, mu, homogeneized) self.HFEngine.plot(uV, name = name, save = save, what = what, saveFormat = saveFormat, saveDPI = saveDPI, show = show, **figspecs) setattr(self.__class__, "plot" + fieldName, objFunc) def addOutParaviewFieldToClass(self, fieldName): def objFunc(self, mu:complex, name : str = fieldName, filename : str = "out", time : float = 0., what : strLst = 'all', forceNewFile : bool = True, folder : bool = False, filePW = None, homogeneized : bool = False): uV = getattr(self.__class__, "get" + fieldName)(self, mu, homogeneized) self.HFEngine.outParaview(uV, name = name, filename = filename, time = time, what = what, forceNewFile = forceNewFile, folder = folder, filePW = filePW) setattr(self.__class__, "outParaview" + fieldName, objFunc) def addOutParaviewTimeDomainFieldToClass(self, fieldName): def objFunc(self, mu:complex, omega : float = None, timeFinal : float = None, periodResolution : int = 20, name : str = fieldName, filename : str = "out", forceNewFile : bool = True, folder : bool = False, homogeneized : bool = False): uV = getattr(self.__class__, "get" + fieldName)(self, mu, homogeneized) if omega is None: omega = np.real(mu) self.HFEngine.outParaviewTimeDomain(uV, omega = omega, timeFinal = timeFinal, periodResolution = periodResolution, name = name, filename = filename, forceNewFile = forceNewFile, folder = folder) setattr(self.__class__, "outParaviewTimeDomain" + fieldName, objFunc) class GenericApproximant: """ ABSTRACT ROM approximant computation for parametric problems. Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': whether to compute POD of snapshots; defaults to True. Defaults to empty dict. homogeneized: Whether to homogeneize Dirichlet BCs. Defaults to False. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. trainedModel: Trained model evaluator. mu0: Default parameter. homogeneized: Whether to homogeneize Dirichlet BCs. approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList. parameterList: Recognized keys of approximant parameters: - 'POD': whether to compute POD of snapshots. verbosity: Verbosity level. POD: Whether to compute POD of snapshots. samplingEngine: Sampling engine. uHF: High fidelity solution with wavenumber lastSolvedHF as numpy complex vector. lastSolvedHF: Wavenumber corresponding to last computed high fidelity solution. uApp: Last evaluated approximant as numpy complex vector. """ __all__ += [ftype + dtype for ftype, dtype in iterprod( ["norm", "plot", "outParaview", "outParaviewTimeDomain"], ["HF", "RHS", "Approx", "Res", "Err"])] def __init__(self, HFEngine:HFEng, mu0 : complex = 0, approxParameters : DictAny = {}, homogeneized : bool = False, verbosity : int = 10, timestamp : bool = True): - self.__preInit() + self._preInit() self._mode = RROMPy_READY self.verbosity = verbosity self.timestamp = timestamp if self.verbosity >= 10: verbosityDepth("INIT", ("Initializing approximant engine of " "type {}.").format(self.name()), timestamp = self.timestamp) self.HFEngine = HFEngine - self.__addParametersToList(["POD"]) + self._addParametersToList(["POD"]) self.mu0 = mu0 self.homogeneized = homogeneized self.approxParameters = approxParameters - self.__postInit() + self._postInit() ### add norm{HF,RHS,Approx,Res,Err} methods """ Compute norm of * at arbitrary parameter. Args: mu: Target parameter. homogeneized(optional): Whether to remove Dirichlet BC. Defaults to False. Returns: Target norm of *. """ for objName in ["HF", "RHS", "Approx", "Res", "Err"]: addNormFieldToClass(self, objName) ### add plot{HF,RHS,Approx,Res,Err} methods """ Do some nice plots of * at arbitrary parameter. Args: mu: Target parameter. name(optional): Name to be shown as title of the plots. Defaults to 'u'. what(optional): Which plots to do. If list, can contain 'ABS', 'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard 'ALL'. Defaults to 'ALL'. save(optional): Where to save plot(s). Defaults to None, i.e. no saving. saveFormat(optional): Format for saved plot(s). Defaults to "eps". saveDPI(optional): DPI for saved plot(s). Defaults to 100. show(optional): Whether to show figure. Defaults to True. homogeneized(optional): Whether to remove Dirichlet BC. Defaults to False. figspecs(optional key args): Optional arguments for matplotlib figure creation. """ for objName in ["HF", "RHS", "Approx", "Res", "Err"]: addPlotFieldToClass(self, objName) ### add outParaview{HF,RHS,Approx,Res,Err} methods """ Output * to ParaView file. Args: mu: Target parameter. name(optional): Base name to be used for data output. filename(optional): Name of output file. time(optional): Timestamp. what(optional): Which plots to do. If list, can contain 'MESH', 'ABS', 'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard 'ALL'. Defaults to 'ALL'. forceNewFile(optional): Whether to create new output file. filePW(optional): Fenics File entity (for time series). homogeneized(optional): Whether to remove Dirichlet BC. Defaults to False. """ for objName in ["HF", "RHS", "Approx", "Res", "Err"]: addOutParaviewFieldToClass(self, objName) ### add outParaviewTimeDomain{HF,RHS,Approx,Res,Err} methods """ Output * to ParaView file, converted to time domain. Args: mu: Target parameter. omega(optional): frequency. timeFinal(optional): final time of simulation. periodResolution(optional): number of time steps per period. name(optional): Base name to be used for data output. filename(optional): Name of output file. forceNewFile(optional): Whether to create new output file. homogeneized(optional): Whether to remove Dirichlet BC. Defaults to False. """ for objName in ["HF", "RHS", "Approx", "Res", "Err"]: addOutParaviewTimeDomainFieldToClass(self, objName) - def __preInit(self): + def _preInit(self): if not hasattr(self, "depth"): self.depth = 0 else: self.depth += 1 - def __addParametersToList(self, what:strLst): + def _addParametersToList(self, what:strLst): if not hasattr(self, "parameterList"): self.parameterList = [] self.parameterList += what - def __postInit(self): + def _postInit(self): if self.depth == 0: if self.verbosity >= 10: verbosityDepth("DEL", "Done initializing.", timestamp = self.timestamp) del self.depth else: self.depth -= 1 def name(self) -> str: return self.__class__.__name__ def __str__(self) -> str: return self.name() def __repr__(self) -> str: return self.__str__() + " at " + hex(id(self)) def setupSampling(self, SamplingEngine : sampleEng = SamplingEngineBase): """Setup sampling engine.""" modeAssert(self._mode, message = "Cannot setup sampling engine.") self.samplingEngine = SamplingEngine(self.HFEngine, verbosity = self.verbosity) @property def mu0(self): """Value of mu0.""" return self._mu0 @mu0.setter def mu0(self, mu0): if not (hasattr(self, "_mu0") and np.isclose(mu0, self.mu0)): self.resetSamples() self._mu0 = mu0 @property def approxParameters(self): """Value of approximant parameters.""" return self._approxParameters @approxParameters.setter def approxParameters(self, approxParams): if not hasattr(self, "approxParameters"): self._approxParameters = {} approxParameters = purgeDict(approxParams, self.parameterList, dictname = self.name() + ".approxParameters", baselevel = 1) keyList = list(approxParameters.keys()) if "POD" in keyList: self.POD = approxParameters["POD"] elif not hasattr(self, "_POD") or self._POD is None: self.POD = True @property def POD(self): """Value of POD.""" return self._POD @POD.setter def POD(self, POD): if hasattr(self, "_POD"): PODold = self.POD else: PODold = -1 self._POD = POD self._approxParameters["POD"] = self.POD if PODold != self.POD: self.samplingEngine = None self.resetSamples() @property def homogeneized(self): """Value of homogeneized.""" return self._homogeneized @homogeneized.setter def homogeneized(self, homogeneized): if not hasattr(self, "_homogeneized"): self._homogeneized = None if homogeneized != self.homogeneized: self._homogeneized = homogeneized self.resetSamples() def solveHF(self, mu : complex = None): """ Find high fidelity solution with original parameters and arbitrary parameter. Args: mu: Target parameter. """ if mu is None: mu = self.mu0 if (not hasattr(self, "lastSolvedHF") or not np.isclose(self.lastSolvedHF, mu)): self.uHF = self.samplingEngine.solveLS(mu, homogeneized = self.homogeneized) self.lastSolvedHF = mu def resetSamples(self): """Reset samples.""" if hasattr(self, "samplingEngine") and self.samplingEngine is not None: self.samplingEngine.resetHistory() else: self.setupSampling() self.trainedModel = None self._mode = RROMPy_READY def plotSamples(self, name : str = "u", save : str = None, what : strLst = 'all', saveFormat : str = "eps", saveDPI : int = 100, **figspecs): """ Do some nice plots of the samples. Args: name(optional): Name to be shown as title of the plots. Defaults to 'u'. what(optional): Which plots to do. If list, can contain 'ABS', 'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard 'ALL'. Defaults to 'ALL'. save(optional): Where to save plot(s). Defaults to None, i.e. no saving. saveFormat(optional): Format for saved plot(s). Defaults to "eps". saveDPI(optional): DPI for saved plot(s). Defaults to 100. figspecs(optional key args): Optional arguments for matplotlib figure creation. """ modeAssert(self._mode, message = "Cannot plot samples.") self.samplingEngine.plotSamples(name = name, save = save, what = what, saveFormat = saveFormat, saveDPI = saveDPI, **figspecs) def outParaviewSamples(self, name : str = "u", filename : str = "out", times : Np1D = None, what : strLst = 'all', forceNewFile : bool = True, folders : bool = False, filePW = None): """ Output samples to ParaView file. Args: name(optional): Base name to be used for data output. filename(optional): Name of output file. times(optional): Timestamps. what(optional): Which plots to do. If list, can contain 'MESH', 'ABS', 'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard 'ALL'. Defaults to 'ALL'. forceNewFile(optional): Whether to create new output file. folders(optional): Whether to split output in folders. filePW(optional): Fenics File entity (for time series). """ modeAssert(self._mode, message = "Cannot output samples.") self.samplingEngine.outParaviewSamples(name = name, filename = filename, times = times, what = what, forceNewFile = forceNewFile, folders = folders, filePW = filePW) def outParaviewTimeDomainSamples(self, omegas : Np1D = None, timeFinal : Np1D = None, periodResolution : int = 20, name : str = "u", filename : str = "out", forceNewFile : bool = True, folders : bool = False): """ Output samples to ParaView file, converted to time domain. Args: omegas(optional): frequencies. timeFinal(optional): final time of simulation. periodResolution(optional): number of time steps per period. name(optional): Base name to be used for data output. filename(optional): Name of output file. forceNewFile(optional): Whether to create new output file. folders(optional): Whether to split output in folders. """ modeAssert(self._mode, message = "Cannot output samples.") self.samplingEngine.outParaviewTimeDomainSamples(omegas = omegas, timeFinal = timeFinal, periodResolution = periodResolution, name = name, filename = filename, forceNewFile = forceNewFile, folders = folders) @abstractmethod def setupApprox(self): """ Setup approximant. (ABSTRACT) Any specialization should include something like if self.checkComputedApprox(): return modeAssert(self._mode, message = "Cannot setup approximant.") ... self.trainedModel = ... self.trainedModel.data = ... self.trainedModel.data.approxParameters = copy( self.approxParameters) """ pass def checkComputedApprox(self) -> bool: """ Check if setup of new approximant is not needed. Returns: True if new setup is not needed. False otherwise. """ return self._mode == RROMPy_FRAGILE or (self.trainedModel is not None and self.trainedModel.data.approxParameters == self.approxParameters) def evalApproxReduced(self, mu:complex): """ Evaluate reduced representation of approximant at arbitrary parameter. Args: mu: Target parameter. """ self.setupApprox() self.uAppReduced = self.trainedModel.getApproxReduced(mu) def evalApprox(self, mu:complex): """ Evaluate approximant at arbitrary parameter. Args: mu: Target parameter. """ self.setupApprox() self.uApp = self.trainedModel.getApprox(mu) def getHF(self, mu:complex, homogeneized : bool = False) -> Np1D: """ Get HF solution at arbitrary parameter. Args: mu: Target parameter. homogeneized(optional): Whether to remove Dirichlet BC. Defaults to False. Returns: HFsolution. """ self.solveHF(mu) if self.homogeneized and not homogeneized: return self.uHF + self.HFEngine.liftDirichletData(mu) if not self.homogeneized and homogeneized: return self.uHF - self.HFEngine.liftDirichletData(mu) return self.uHF def getRHS(self, mu:complex, homogeneized : bool = False) -> Np1D: """ Get linear system RHS at arbitrary parameter. Args: mu: Target parameter. homogeneized(optional): Whether to remove Dirichlet BC. Defaults to False. Returns: Linear system RHS. """ return self.HFEngine.residual(None, mu, homogeneized = homogeneized) def getApproxReduced(self, mu:complex) -> Np1D: """ Get approximant at arbitrary parameter. Args: mu: Target parameter. Returns: Reduced approximant. """ self.evalApproxReduced(mu) return self.uAppReduced def getApprox(self, mu:complex, homogeneized : bool = False) -> Np1D: """ Get approximant at arbitrary parameter. Args: mu: Target parameter. homogeneized(optional): Whether to remove Dirichlet BC. Defaults to False. Returns: Approximant. """ self.evalApprox(mu) if self.homogeneized and not homogeneized: return self.uApp + self.HFEngine.liftDirichletData(mu) if not self.homogeneized and homogeneized: return self.uApp - self.HFEngine.liftDirichletData(mu) return self.uApp def getRes(self, mu:complex, homogeneized : bool = False) -> Np1D: """ Get residual at arbitrary parameter. Args: mu: Target parameter. homogeneized(optional): Whether to remove Dirichlet BC. Defaults to False. Returns: Approximant residual. """ return self.HFEngine.residual(self.getApprox(mu, homogeneized), mu, homogeneized = homogeneized) def getErr(self, mu:complex, homogeneized : bool = False) -> Np1D: """ Get error at arbitrary parameter. Args: mu: Target parameter. homogeneized(optional): Whether to remove Dirichlet BC. Defaults to False. Returns: Approximant error. """ return self.getApprox(mu, homogeneized) - self.getHF(mu, homogeneized) def getPoles(self) -> Np1D: """ Obtain approximant poles. Returns: Numpy complex vector of poles. """ self.setupApprox() if self.verbosity >= 20: verbosityDepth("INIT", "Computing poles of model.", timestamp = self.timestamp) poles = self.trainedModel.getPoles() if self.verbosity >= 20: verbosityDepth("DEL", "Done computing poles.", timestamp = self.timestamp) return poles def storeTrainedModel(self, filenameBase : str = "trained_model", forceNewFile : bool = True): """Store trained reduced model to file.""" self.setupApprox() if self.verbosity >= 20: verbosityDepth("INIT", "Storing trained model to file.", timestamp = self.timestamp) if forceNewFile: filename = getNewFilename(filenameBase, "pkl") else: filename = "{}.pkl".format(filenameBase) with open(filename, "wb") as fileOut: pickle.dump(self.trainedModel.data.__dict__, fileOut) if self.verbosity >= 20: verbosityDepth("DEL", "Done storing trained model.", timestamp = self.timestamp) return filename def loadTrainedModel(self, filename:str): """Load trained reduced model from file.""" if self.verbosity >= 20: verbosityDepth("INIT", "Loading pre-trained model from file.", timestamp = self.timestamp) with open(filename, "rb") as fileIn: datadict = pickle.load(fileIn) name = datadict.pop("name") if name == "TrainedModelPade": from rrompy.reduction_methods.trained_model import \ TrainedModelPade as tModel elif name == "TrainedModelRB": from rrompy.reduction_methods.trained_model import \ TrainedModelRB as tModel else: raise RROMPyException(("Trained model name not recognized. " "Loading failed.")) self.mu0 = datadict.pop("mu0") from rrompy.reduction_methods.trained_model import TrainedModelData trainedModel = tModel() trainedModel.verbosity = self.verbosity trainedModel.timestamp = self.timestamp data = TrainedModelData(name, self.mu0, datadict.pop("projMat"), datadict.pop("rescalingExp")) if "mus" in datadict: data.mus = datadict.pop("mus") approxParameters = datadict.pop("approxParameters") data.approxParameters = copy(approxParameters) if "sampler" in approxParameters: self._approxParameters["sampler"] = approxParameters.pop("sampler") self.approxParameters = copy(approxParameters) if "mus" in data.__dict__: self.mus = np.copy(data.mus) if name == "TrainedModelPade": self.scaleFactor = datadict.pop("scaleFactor") data.scaleFactor = self.scaleFactor for key in datadict: setattr(data, key, datadict[key]) trainedModel.data = data self.trainedModel = trainedModel self._mode = RROMPy_FRAGILE if self.verbosity >= 20: verbosityDepth("DEL", "Done loading pre-trained model.", timestamp = self.timestamp) diff --git a/rrompy/reduction_methods/lagrange/approximant_lagrange_pade.py b/rrompy/reduction_methods/lagrange/approximant_lagrange_pade.py index e953ced..6125e40 100644 --- a/rrompy/reduction_methods/lagrange/approximant_lagrange_pade.py +++ b/rrompy/reduction_methods/lagrange/approximant_lagrange_pade.py @@ -1,520 +1,520 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from copy import copy import numpy as np from scipy.special import factorial as fact from rrompy.reduction_methods.base import checkRobustTolerance from .generic_approximant_lagrange import GenericApproximantLagrange from rrompy.utilities.poly_fitting import (polybases, polyvander, polyfitname, customFit) from rrompy.reduction_methods.trained_model import TrainedModelPade as tModel from rrompy.reduction_methods.trained_model import TrainedModelData from rrompy.utilities.base.types import Np1D, Np2D, HFEng, DictAny, Tuple from rrompy.utilities.base import verbosityDepth, purgeDict from rrompy.utilities.exception_manager import (RROMPyException, modeAssert, RROMPyWarning) __all__ = ['ApproximantLagrangePade'] class ApproximantLagrangePade(GenericApproximantLagrange): """ ROM Lagrange Pade' interpolant computation for parametric problems. Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': whether to compute POD of snapshots; defaults to True; - 'S': total number of samples current approximant relies upon; defaults to 2; - 'sampler': sample point generator; defaults to uniform sampler on [0, 1]; - 'polybasis': type of polynomial basis for interpolation; allowed values include 'MONOMIAL', 'CHEBYSHEV' and 'LEGENDRE'; defaults to 'MONOMIAL'; - 'E': coefficient of interpolant to be minimized; defaults to min(S, M + 1); - 'M': degree of Pade' interpolant numerator; defaults to 0; - 'N': degree of Pade' interpolant denominator; defaults to 0; - 'interpRcond': tolerance for interpolation via numpy.polyfit; defaults to None; - 'robustTol': tolerance for robust Pade' denominator management; defaults to 0. Defaults to empty dict. homogeneized: Whether to homogeneize Dirichlet BCs. Defaults to False. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. mu0: Default parameter. mus: Array of snapshot parameters. ws: Array of snapshot weigths. homogeneized: Whether to homogeneize Dirichlet BCs. approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList. parameterList: Recognized keys of approximant parameters: - 'POD': whether to compute POD of snapshots; - 'S': total number of samples current approximant relies upon; - 'sampler': sample point generator; - 'polybasis': type of polynomial basis for interpolation; - 'E': coefficient of interpolant to be minimized; - 'M': degree of Pade' interpolant numerator; - 'N': degree of Pade' interpolant denominator; - 'interpRcond': tolerance for interpolation via numpy.polyfit; - 'robustTol': tolerance for robust Pade' denominator management. extraApproxParameters: List of approxParameters keys in addition to mother class's. S: Number of solution snapshots over which current approximant is based upon. sampler: Sample point generator. M: Numerator degree of approximant. N: Denominator degree of approximant. POD: Whether to compute POD of snapshots. interpRcond: Tolerance for interpolation via numpy.polyfit. robustTol: Tolerance for robust Pade' denominator management. samplingEngine: Sampling engine. uHF: High fidelity solution with wavenumber lastSolvedHF as numpy complex vector. lastSolvedHF: Wavenumber corresponding to last computed high fidelity solution. Q: Numpy 1D vector containing complex coefficients of approximant denominator. P: Numpy 2D vector whose columns are FE dofs of coefficients of approximant numerator. uApp: Last evaluated approximant as numpy complex vector. """ def __init__(self, HFEngine:HFEng, mu0 : complex = 0., approxParameters : DictAny = {}, homogeneized : bool = False, verbosity : int = 10, timestamp : bool = True): - self.__preInit() - self.__addParametersToList(["polybasis", "E", "M", "N", + self._preInit() + self._addParametersToList(["polybasis", "E", "M", "N", "interpRcond", "robustTol"]) super().__init__(HFEngine = HFEngine, mu0 = mu0, approxParameters = approxParameters, homogeneized = homogeneized, verbosity = verbosity, timestamp = timestamp) - self.__postInit() + self._postInit() @property def approxParameters(self): """ Value of approximant parameters. """ return self._approxParameters @approxParameters.setter def approxParameters(self, approxParams): approxParameters = purgeDict(approxParams, self.parameterList, dictname = self.name() + ".approxParameters", baselevel = 1) approxParametersCopy = purgeDict(approxParameters, ["polybasis", "E", "M", "N", "interpRcond", "robustTol"], True, True, baselevel = 1) if hasattr(self, "_M") and self.M is not None: Mold = self.M self._M = 0 if hasattr(self, "_N") and self.N is not None: Nold = self.N self._N = 0 if hasattr(self, "_E") and self.E is not None: self._E = 0 GenericApproximantLagrange.approxParameters.fset(self, approxParametersCopy) keyList = list(approxParameters.keys()) if "polybasis" in keyList: self.polybasis = approxParameters["polybasis"] elif not hasattr(self, "_polybasis") or self._polybasis is None: self.polybasis = "MONOMIAL" if "interpRcond" in keyList: self.interpRcond = approxParameters["interpRcond"] elif not hasattr(self, "interpRcond") or self.interpRcond is None: self.interpRcond = None if "robustTol" in keyList: self.robustTol = approxParameters["robustTol"] elif not hasattr(self, "_robustTol") or self._robustTol is None: self.robustTol = 0 if "M" in keyList: self.M = approxParameters["M"] elif hasattr(self, "_M") and self.M is not None: self.M = Mold else: self.M = 0 if "N" in keyList: self.N = approxParameters["N"] elif hasattr(self, "_N") and self.N is not None: self.N = Nold else: self.N = 0 if "E" in keyList: self.E = approxParameters["E"] else: self.E = min(self.S - 1, self.M + 1) @property def polybasis(self): """Value of polybasis.""" return self._polybasis @polybasis.setter def polybasis(self, polybasis): try: polybasis = polybasis.upper().strip().replace(" ","") if polybasis not in polybases: raise RROMPyException("Prescribed polybasis not recognized.") self._polybasis = polybasis except: RROMPyWarning(("Prescribed polybasis not recognized. Overriding " "to 'MONOMIAL'.")) self._sampleType = "MONOMIAL" self._approxParameters["polybasis"] = self.polybasis @property def interpRcond(self): """Value of interpRcond.""" return self._interpRcond @interpRcond.setter def interpRcond(self, interpRcond): self._interpRcond = interpRcond self._approxParameters["interpRcond"] = self.interpRcond @property def M(self): """Value of M. Its assignment may change S.""" return self._M @M.setter def M(self, M): if M < 0: raise RROMPyException("M must be non-negative.") self._M = M self._approxParameters["M"] = self.M if hasattr(self, "_S") and self.S < self.M + 1: RROMPyWarning("Prescribed S is too small. Updating S to M + 1.") self.S = self.M + 1 @property def N(self): """Value of N. Its assignment may change S.""" return self._N @N.setter def N(self, N): if N < 0: raise RROMPyException("N must be non-negative.") self._N = N self._approxParameters["N"] = self.N if hasattr(self, "_S") and self.S < self.N + 1: RROMPyWarning("Prescribed S is too small. Updating S to N + 1.") self.S = self.N + 1 @property def E(self): """Value of E. Its assignment may change S.""" return self._E @E.setter def E(self, E): if E < 0: raise RROMPyException("E must be non-negative.") self._E = E self._approxParameters["E"] = self.E if hasattr(self, "_S") and self.S < self.E + 1: RROMPyWarning("Prescribed S is too small. Updating S to E + 1.") self.S = self.E + 1 @property def robustTol(self): """Value of tolerance for robust Pade' denominator management.""" return self._robustTol @robustTol.setter def robustTol(self, robustTol): if robustTol < 0.: RROMPyWarning(("Overriding prescribed negative robustness " "tolerance to 0.")) robustTol = 0. self._robustTol = robustTol self._approxParameters["robustTol"] = self.robustTol @property def S(self): """Value of S.""" return self._S @S.setter def S(self, S): if S <= 0: raise RROMPyException("S must be positive.") if hasattr(self, "_S"): Sold = self.S else: Sold = -1 vals, label = [0] * 3, {0:"M", 1:"N", 2:"E"} if hasattr(self, "_M") and self._M is not None: vals[0] = self.M if hasattr(self, "_N") and self._N is not None: vals[1] = self.N if hasattr(self, "_E") and self._E is not None: vals[2] = self.E idxmax = np.argmax(vals) if vals[idxmax] + 1 > S: RROMPyWarning(("Prescribed S is too small. Updating S to {} + " "1.").format(label[idxmax])) self.S = vals[idxmax] + 1 else: self._S = S self._approxParameters["S"] = self.S if Sold != self.S: self.resetSamples() - def __setupDenominator(self): + def _setupDenominator(self): """Compute Pade' denominator.""" if self.verbosity >= 7: verbosityDepth("INIT", "Starting computation of denominator.", timestamp = self.timestamp) while self.N > 0: TE = polyvander[self.polybasis](self.radiusPade(self.mus), self.E, scl = 1. / self.scaleFactor) TE = (TE.T * self.ws).T RHS = np.zeros(self.E + 1) RHS[-1] = 1. fitOut = customFit(TE.T, RHS, full = True, rcond = self.interpRcond) if self.verbosity >= 5: condfit = fitOut[1][2][0] / fitOut[1][2][-1] verbosityDepth("MAIN", ("Fitting {} samples with degree {} " "through {}... Conditioning of LS " "system: {:.4e}.").format( self.S, self.E, polyfitname[self.polybasis], condfit), timestamp = self.timestamp) if fitOut[1][1] < self.E + 1: Enew = fitOut[1][1] - 1 Nnew = min(self.N, Enew) Mnew = min(self.M, Enew) if Nnew == self.N: strN = "" else: strN = "N from {} to {} and ".format(self.N, Nnew) if Mnew == self.M: strM = "" else: strM = "M from {} to {} and ".format(self.M, Mnew) RROMPyWarning(("Polyfit is poorly conditioned.\nReducing {}{}" "E from {} to {}.").format(strN, strM, self.E, Enew)) newParams = {"N" : Nnew, "M" : Mnew, "E" : Enew} self.approxParameters = newParams continue mus_un, idx_un, cnt_un = np.unique(self.mus, return_inverse = True, return_counts = True) TE = polyvander[self.polybasis](self.radiusPade(self.mus), self.N, scl = 1. / self.scaleFactor) TE = (TE.T * self.ws).T if len(mus_un) == len(self.mus): Ghalf = (TE.T * fitOut[0]).T else: pseudoInv = np.zeros((len(self.mus), len(self.mus)), dtype = np.complex) for j in range(len(mus_un)): pseudoInv_loc = np.zeros((cnt_un[j], cnt_un[j]), dtype = np.complex) mask = np.arange(len(self.mus))[idx_un == j] for der in range(cnt_un[j]): fitderj = fitOut[0][mask[der]] pseudoInv_loc = (pseudoInv_loc + fitderj * np.diag(np.ones(1 + der), k = der - cnt_un[j] + 1)) I = np.ix_(mask, mask) pseudoInv[I] = np.flipud(pseudoInv_loc) Ghalf = pseudoInv.dot(TE) if self.POD: self.Ghalf = self.samplingEngine.RPOD.dot(Ghalf) ev, eV = self.findeveVGQR() else: self.Ghalf = self.samplingEngine.samples.dot(Ghalf) ev, eV = self.findeveVGExplicit() newParams = checkRobustTolerance(ev, self.E, self.robustTol) if not newParams: break self.approxParameters = newParams if self.N <= 0: self._N = 0 eV = np.ones((1, 1)) if self.verbosity >= 7: verbosityDepth("DEL", "Done computing denominator.", timestamp = self.timestamp) return eV[:, 0] - def __setupNumerator(self): + def _setupNumerator(self): """Compute Pade' numerator.""" if self.verbosity >= 7: verbosityDepth("INIT", "Starting computation of numerator.", timestamp = self.timestamp) Qevaldiag = np.diag(self.trainedModel.getQVal(self.mus)) verb = self.trainedModel.verbosity self.trainedModel.verbosity = 0 mus_un, idx_un, cnt_un = np.unique(self.mus, return_inverse = True, return_counts = True) for j in range(len(mus_un)): if cnt_un[j] > 1: Q_loc = np.zeros((cnt_un[j], cnt_un[j]), dtype = np.complex) for der in range(1, cnt_un[j]): Qderj = (self.trainedModel.getQVal(mus_un[j], der, scl = 1. / self.scaleFactor) / fact(der)) Q_loc = Q_loc + Qderj * np.diag(np.ones(cnt_un[j] - der), k = - der) I = np.ix_(idx_un == j, idx_un == j) Qevaldiag[I] = Qevaldiag[I] + Q_loc self.trainedModel.verbosity = verb while self.M >= 0: fitVander = polyvander[self.polybasis](self.radiusPade(self.mus), self.M, scl = 1. / self.scaleFactor) fitOut = customFit(fitVander, Qevaldiag, w = self.ws, full = True, rcond = self.interpRcond) if self.verbosity >= 5: condfit = fitOut[1][2][0] / fitOut[1][2][-1] verbosityDepth("MAIN", ("Fitting {} samples with degree {} " "through {}... Conditioning of LS " "system: {:.4e}.").format( self.S, self.M, polyfitname[self.polybasis], condfit), timestamp = self.timestamp) if fitOut[1][1] == self.M + 1: P = fitOut[0].T break RROMPyWarning(("Polyfit is poorly conditioned. Reducing M from {} " "to {}. Exact snapshot interpolation not " "guaranteed.").format(self.M, fitOut[1][1] - 1)) self.M = fitOut[1][1] - 1 if self.M <= 0: raise RROMPyException(("Instability in computation of numerator. " "Aborting.")) if self.verbosity >= 7: verbosityDepth("DEL", "Done computing numerator.", timestamp = self.timestamp) return np.atleast_2d(P) def setupApprox(self): """ Compute Pade' interpolant. SVD-based robust eigenvalue management. """ if self.checkComputedApprox(): return modeAssert(self._mode, message = "Cannot setup approximant.") if self.verbosity >= 5: verbosityDepth("INIT", "Setting up {}.". format(self.name()), timestamp = self.timestamp) self.computeScaleFactor() self.computeSnapshots() if self.trainedModel is None: self.trainedModel = tModel() self.trainedModel.verbosity = self.verbosity self.trainedModel.timestamp = self.timestamp data = TrainedModelData(self.trainedModel.name(), self.mu0, np.copy(self.samplingEngine.samples), self.HFEngine.rescalingExp) data.polytype = self.polybasis data.scaleFactor = self.scaleFactor data.mus = np.copy(self.mus) self.trainedModel.data = data else: self.trainedModel.data.projMat = np.copy( self.samplingEngine.samples) if self.N > 0: - Q = self.__setupDenominator() + Q = self._setupDenominator() else: Q = np.ones(1, dtype = np.complex) self.trainedModel.data.Q = np.copy(Q) - P = self.__setupNumerator() + P = self._setupNumerator() if self.POD: P = self.samplingEngine.RPOD.dot(P) self.trainedModel.data.P = np.copy(P) self.trainedModel.data.approxParameters = copy(self.approxParameters) if self.verbosity >= 5: verbosityDepth("DEL", "Done setting up approximant.", timestamp = self.timestamp) def findeveVGExplicit(self, verbOutput : int = 5) -> Tuple[Np1D, Np2D]: """ Compute explicitly eigenvalues and eigenvectors of Pade' denominator matrix. """ modeAssert(self._mode, message = "Cannot solve eigenvalue problem.") if self.verbosity >= 10: verbosityDepth("INIT", "Building gramian matrix.", timestamp = self.timestamp) self.G = self.HFEngine.innerProduct(self.Ghalf, self.Ghalf) if self.verbosity >= 10: verbosityDepth("DEL", "Done building gramian.", timestamp = self.timestamp) if self.verbosity >= 7: verbosityDepth("INIT", ("Solving eigenvalue problem for gramian " "matrix."), timestamp = self.timestamp) ev, eV = np.linalg.eigh(self.G) if self.verbosity >= verbOutput: try: condev = ev[-1] / ev[0] except: condev = np.inf verbosityDepth("MAIN", ("Solved eigenvalue problem of size {} " "with condition number {:.4e}.").format( self.N + 1, condev), timestamp = self.timestamp) if self.verbosity >= 7: verbosityDepth("DEL", "Done solving eigenvalue problem.", timestamp = self.timestamp) return ev, eV def findeveVGQR(self, verbOutput : int = 5) -> Tuple[Np1D, Np2D]: """ Compute eigenvalues and eigenvectors of Pade' denominator matrix through SVD of R factor. """ if self.verbosity >= 7: verbosityDepth("INIT", ("Solving svd for square root of gramian " "matrix."), timestamp = self.timestamp) _, s, eV = np.linalg.svd(self.Ghalf, full_matrices = False) ev = s[::-1] eV = eV[::-1, :].T.conj() if self.verbosity >= verbOutput: try: condev = s[0] / s[-1] except: condev = np.inf verbosityDepth("MAIN", ("Solved svd problem of size {} x {} with " "condition number {:.4e}.").format( self.S, self.N + 1, condev), timestamp = self.timestamp) if self.verbosity >= 7: verbosityDepth("DEL", "Done solving eigenvalue problem.", timestamp = self.timestamp) return ev, eV def radiusPade(self, mu:Np1D, mu0 : float = None) -> float: """ Compute translated radius to be plugged into Pade' approximant. Args: mu: Parameter(s) 1. mu0: Parameter(s) 2. If None, set to self.mu0. Returns: Translated radius to be plugged into Pade' approximant. """ return self.trainedModel.radiusPade(mu, mu0) def getResidues(self) -> Np1D: """ Obtain approximant residues. Returns: Matrix with residues as columns. """ return self.trainedModel.getResidues() diff --git a/rrompy/reduction_methods/lagrange/approximant_lagrange_rb.py b/rrompy/reduction_methods/lagrange/approximant_lagrange_rb.py index a015bd6..978f620 100644 --- a/rrompy/reduction_methods/lagrange/approximant_lagrange_rb.py +++ b/rrompy/reduction_methods/lagrange/approximant_lagrange_rb.py @@ -1,212 +1,212 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from copy import copy import numpy as np from .generic_approximant_lagrange import GenericApproximantLagrange from rrompy.reduction_methods.trained_model import TrainedModelRB as tModel from rrompy.reduction_methods.trained_model import TrainedModelData from rrompy.reduction_methods.base.rb_utils import projectAffineDecomposition from rrompy.utilities.base.types import Np1D, Np2D, List, Tuple, DictAny, HFEng from rrompy.utilities.base import purgeDict, verbosityDepth from rrompy.utilities.exception_manager import RROMPyWarning, RROMPyException __all__ = ['ApproximantLagrangeRB'] class ApproximantLagrangeRB(GenericApproximantLagrange): """ ROM RB approximant computation for parametric problems. Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': whether to compute POD of snapshots; defaults to True; - 'S': total number of samples current approximant relies upon; defaults to 2; - 'sampler': sample point generator; defaults to uniform sampler on [0, 1]; - 'R': rank for Galerkin projection; defaults to S. Defaults to empty dict. homogeneized: Whether to homogeneize Dirichlet BCs. Defaults to False. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. mu0: Default parameter. mus: Array of snapshot parameters. ws: Array of snapshot weigths (unused). homogeneized: Whether to homogeneize Dirichlet BCs. approxRadius: Dummy radius of approximant (i.e. distance from mu0 to farthest sample point). approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList. parameterList: Recognized keys of approximant parameters: - 'POD': whether to compute POD of snapshots; - 'S': total number of samples current approximant relies upon; - 'sampler': sample point generator; - 'R': rank for Galerkin projection. extraApproxParameters: List of approxParameters keys in addition to mother class's. S: Number of solution snapshots over which current approximant is based upon. sampler: Sample point generator. R: Rank for Galerkin projection. POD: Whether to compute POD of snapshots. samplingEngine: Sampling engine. uHF: High fidelity solution with wavenumber lastSolvedHF as numpy complex vector. lastSolvedHF: Wavenumber corresponding to last computed high fidelity solution. uApp: Last evaluated approximant as numpy complex vector. As: List of sparse matrices (in CSC format) representing coefficients of linear system matrix wrt theta(mu). bs: List of numpy vectors representing coefficients of linear system RHS wrt theta(mu). thetaAs: List of callables representing coefficients of linear system matrix wrt mu. thetabs: List of callables representing coefficients of linear system RHS wrt mu. ARBs: List of sparse matrices (in CSC format) representing coefficients of compressed linear system matrix wrt theta(mu). bRBs: List of numpy vectors representing coefficients of compressed linear system RHS wrt theta(mu). """ def __init__(self, HFEngine:HFEng, mu0 : complex = 0., approxParameters : DictAny = {}, homogeneized : bool = False, verbosity : int = 10, timestamp : bool = True): - self.__preInit() - self.__addParametersToList(["R"]) + self._preInit() + self._addParametersToList(["R"]) super().__init__(HFEngine = HFEngine, mu0 = mu0, approxParameters = approxParameters, homogeneized = homogeneized, verbosity = verbosity, timestamp = timestamp) if self.verbosity >= 10: verbosityDepth("INIT", "Computing affine blocks of system.", timestamp = self.timestamp) self.As = self.HFEngine.affineLinearSystemA(self.mu0) self.bs = self.HFEngine.affineLinearSystemb(self.mu0, self.homogeneized) if self.verbosity >= 10: verbosityDepth("DEL", "Done computing affine blocks.", timestamp = self.timestamp) - self.__postInit() + self._postInit() @property def approxParameters(self): """ Value of approximant parameters. Its assignment may change M, N and S. """ return self._approxParameters @approxParameters.setter def approxParameters(self, approxParams): approxParameters = purgeDict(approxParams, self.parameterList, dictname = self.name() + ".approxParameters", baselevel = 1) approxParametersCopy = purgeDict(approxParameters, ["R"], True, True, baselevel = 1) GenericApproximantLagrange.approxParameters.fset(self, approxParametersCopy) keyList = list(approxParameters.keys()) if "R" in keyList: self.R = approxParameters["R"] elif hasattr(self, "_R") and self._R is not None: self.R = self.R else: self.R = self.S @property def R(self): """Value of R. Its assignment may change S.""" return self._R @R.setter def R(self, R): if R < 0: raise RROMPyException("R must be non-negative.") self._R = R self._approxParameters["R"] = self.R if hasattr(self, "_S") and self.S < self.R: RROMPyWarning("Prescribed S is too small. Updating S to R.") self.S = self.R def setupApprox(self): """Compute RB projection matrix.""" if self.checkComputedApprox(): return if self.verbosity >= 5: verbosityDepth("INIT", "Setting up {}.". format(self.name()), timestamp = self.timestamp) self.computeSnapshots() if self.verbosity >= 7: verbosityDepth("INIT", "Computing projection matrix.", timestamp = self.timestamp) if self.POD: U, _, _ = np.linalg.svd(self.samplingEngine.RPOD, full_matrices = False) pMat = self.samplingEngine.samples.dot(U[:, : self.R]) else: pMat = self.samplingEngine.samples[:, : self.R] if self.trainedModel is None: self.trainedModel = tModel() self.trainedModel.verbosity = self.verbosity self.trainedModel.timestamp = self.timestamp data = TrainedModelData(self.trainedModel.name(), self.mu0, np.copy(pMat), self.HFEngine.rescalingExp) data.thetaAs = self.HFEngine.affineWeightsA(self.mu0) data.thetabs = self.HFEngine.affineWeightsb(self.mu0, self.homogeneized) data.ARBs, data.bRBs = self.assembleReducedSystem(pMat) data.mus = np.copy(self.mus) self.trainedModel.data = data else: pMatOld = self.trainedModel.data.projMat Sold = pMatOld.shape[1] ARBs, bRBs = self.assembleReducedSystem(pMat[:, Sold :], pMatOld) self.trainedModel.data.ARBs = ARBs self.trainedModel.data.bRBs = bRBs self.trainedModel.data.projMat = np.copy(pMat) if self.verbosity >= 7: verbosityDepth("DEL", "Done computing projection matrix.", timestamp = self.timestamp) self.trainedModel.data.approxParameters = copy(self.approxParameters) if self.verbosity >= 5: verbosityDepth("DEL", "Done setting up approximant.", timestamp = self.timestamp) def assembleReducedSystem(self, pMat : Np2D = None, pMatOld : Np2D = None)\ -> Tuple[List[Np2D], List[Np1D]]: """Build affine blocks of RB linear system through projections.""" if pMat is None: self.setupApprox() ARBs = self.trainedModel.data.ARBs bRBs = self.trainedModel.data.bRBs else: if self.verbosity >= 10: verbosityDepth("INIT", "Projecting affine terms of HF model.", timestamp = self.timestamp) ARBsOld = None if pMatOld is None else self.trainedModel.data.ARBs bRBsOld = None if pMatOld is None else self.trainedModel.data.bRBs ARBs, bRBs = projectAffineDecomposition(self.As, self.bs, pMat, ARBsOld, bRBsOld, pMatOld) if self.verbosity >= 10: verbosityDepth("DEL", "Done projecting affine terms.", timestamp = self.timestamp) return ARBs, bRBs diff --git a/rrompy/reduction_methods/lagrange/generic_approximant_lagrange.py b/rrompy/reduction_methods/lagrange/generic_approximant_lagrange.py index 0126910..b5239b7 100644 --- a/rrompy/reduction_methods/lagrange/generic_approximant_lagrange.py +++ b/rrompy/reduction_methods/lagrange/generic_approximant_lagrange.py @@ -1,195 +1,195 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import numpy as np from rrompy.reduction_methods.base.generic_approximant import ( GenericApproximant) from rrompy.utilities.base.types import DictAny, HFEng from rrompy.utilities.base import purgeDict, verbosityDepth from rrompy.utilities.exception_manager import RROMPyException, modeAssert __all__ = ['GenericApproximantLagrange'] class GenericApproximantLagrange(GenericApproximant): """ ROM Lagrange interpolant computation for parametric problems (ABSTRACT). Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': whether to compute POD of snapshots; defaults to True; - 'S': total number of samples current approximant relies upon. Defaults to empty dict. homogeneized: Whether to homogeneize Dirichlet BCs. Defaults to False. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. mu0: Default parameter. mus: Array of snapshot parameters. ws: Array of snapshot weigths. homogeneized: Whether to homogeneize Dirichlet BCs. approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList. parameterList: Recognized keys of approximant parameters: - 'POD': whether to compute POD of snapshots; - 'S': total number of snapshots current approximant relies upon. extraApproxParameters: List of approxParameters keys in addition to mother class's. S: Number of solution snapshots over which current approximant is based upon. sampler: Sample point generator. POD: Whether to compute POD of snapshots. samplingEngine: Sampling engine. uHF: High fidelity solution with wavenumber lastSolvedHF as numpy complex vector. lastSolvedHF: Wavenumber corresponding to last computed high fidelity solution. uApp: Last evaluated approximant as numpy complex vector. """ def __init__(self, HFEngine:HFEng, mu0 : complex = 0., approxParameters : DictAny = {}, homogeneized : bool = False, verbosity : int = 10, timestamp : bool = True): - self.__preInit() - self.__addParametersToList(["S"]) + self._preInit() + self._addParametersToList(["S"]) super().__init__(HFEngine = HFEngine, mu0 = mu0, approxParameters = approxParameters, homogeneized = homogeneized, verbosity = verbosity, timestamp = timestamp) from rrompy.utilities.parameter_sampling import QuadratureSampler self.sampler = QuadratureSampler([0., 1.], "UNIFORM") del QuadratureSampler - self.__postInit() + self._postInit() def setupSampling(self): """Setup sampling engine.""" modeAssert(self._mode, message = "Cannot setup sampling engine.") if not hasattr(self, "_POD") or self._POD is None: return if self.POD: from rrompy.sampling.linear_problem.sampling_engine_lagrange_pod \ import SamplingEngineLagrangePOD super().setupSampling(SamplingEngineLagrangePOD) else: from rrompy.sampling.linear_problem.sampling_engine_lagrange \ import SamplingEngineLagrange super().setupSampling(SamplingEngineLagrange) @property def mus(self): """Value of mus. Its assignment may reset snapshots.""" return self._mus @mus.setter def mus(self, mus): musOld = self.mus if hasattr(self, '_mus') else None self._mus = np.array(mus) # _, musCounts = np.unique(self._mus, return_counts = True) # if len(np.where(musCounts > 1)[0]) > 0: # raise RROMPyException("Repeated sample points not allowed.") if (musOld is None or len(self.mus) != len(musOld) or not np.allclose(self.mus, musOld, 1e-14)): self.resetSamples() self.autoNode = None @property def approxParameters(self): """Value of approximant parameters. Its assignment may change S.""" return self._approxParameters @approxParameters.setter def approxParameters(self, approxParams): approxParameters = purgeDict(approxParams, self.parameterList, dictname = self.name() + ".approxParameters", baselevel = 1) approxParametersCopy = purgeDict(approxParameters, ["S"], True, True, baselevel = 1) GenericApproximant.approxParameters.fset(self, approxParametersCopy) keyList = list(approxParameters.keys()) if "S" in keyList: self.S = approxParameters["S"] elif not hasattr(self, "_S") or self._S is None: self.S = 2 @property def S(self): """Value of S.""" return self._S @S.setter def S(self, S): if S <= 0: raise RROMPyException("S must be positive.") if hasattr(self, "_S") and self._S is not None: Sold = self.S else: Sold = -1 self._S = S self._approxParameters["S"] = self.S if Sold != self.S: self.resetSamples() @property def sampler(self): """Value of sampler.""" return self._sampler @sampler.setter def sampler(self, sampler): if 'generatePoints' not in dir(sampler): raise RROMPyException("Sampler type not recognized.") if hasattr(self, '_sampler') and self._sampler is not None: samplerOld = self.sampler self._sampler = sampler self._approxParameters["sampler"] = self.sampler.__str__() if not 'samplerOld' in locals() or samplerOld != self.sampler: self.resetSamples() def computeSnapshots(self): """Compute snapshots of solution map.""" modeAssert(self._mode, message = "Cannot start snapshot computation.") if self.samplingEngine.samples is None: if self.verbosity >= 5: verbosityDepth("INIT", "Starting computation of snapshots.", timestamp = self.timestamp) self.mus, self.ws = self.sampler.generatePoints(self.S) self.samplingEngine.iterSample(self.mus, homogeneized = self.homogeneized) if self.verbosity >= 5: verbosityDepth("DEL", "Done computing snapshots.", timestamp = self.timestamp) def normApprox(self, mu:complex, homogeneized : bool = False) -> float: """ Compute norm of approximant at arbitrary parameter. Args: mu: Target parameter. homogeneized(optional): Whether to remove Dirichlet BC. Defaults to False. Returns: Target norm of approximant. """ if not self.POD or self.homogeneized != homogeneized: return super().normApprox(mu, homogeneized) return np.linalg.norm(self.getApproxReduced(mu)) def computeScaleFactor(self): """Compute parameter rescaling factor.""" modeAssert(self._mode, message = "Cannot compute rescaling factor.") self.scaleFactor = .5 * np.abs(np.power(self.sampler.lims[0], self.HFEngine.rescalingExp) - np.power(self.sampler.lims[1], self.HFEngine.rescalingExp)) diff --git a/rrompy/reduction_methods/lagrange_greedy/approximant_lagrange_greedy_pade.py b/rrompy/reduction_methods/lagrange_greedy/approximant_lagrange_greedy_pade.py index 264816f..b37d5da 100644 --- a/rrompy/reduction_methods/lagrange_greedy/approximant_lagrange_greedy_pade.py +++ b/rrompy/reduction_methods/lagrange_greedy/approximant_lagrange_greedy_pade.py @@ -1,549 +1,549 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from copy import copy import numpy as np from scipy.special import factorial as fact from .generic_approximant_lagrange_greedy import ( GenericApproximantLagrangeGreedy) from rrompy.utilities.poly_fitting import (polybases, polyvander, polydomcoeff, polyfitname, customFit) from rrompy.reduction_methods.lagrange import ApproximantLagrangePade from rrompy.reduction_methods.trained_model import TrainedModelPade as tModel from rrompy.reduction_methods.trained_model import TrainedModelData from rrompy.utilities.base.types import DictAny, List, HFEng from rrompy.utilities.base import purgeDict, verbosityDepth from rrompy.utilities.exception_manager import RROMPyWarning from rrompy.utilities.exception_manager import RROMPyException __all__ = ['ApproximantLagrangePadeGreedy'] class ApproximantLagrangePadeGreedy(GenericApproximantLagrangeGreedy, ApproximantLagrangePade): """ ROM greedy Pade' interpolant computation for parametric problems. Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': whether to compute POD of snapshots; defaults to True; - 'muBounds': list of bounds for parameter values; defaults to [[0], [1]]; - 'basis': type of basis for interpolation; allowed values include 'MONOMIAL', 'CHEBYSHEV' and 'LEGENDRE'; defaults to 'MONOMIAL'; - 'Delta': difference between M and N in rational approximant; defaults to 0; - 'greedyTol': uniform error tolerance for greedy algorithm; defaults to 1e-2; - 'errorEstimatorKind': kind of error estimator; available values include 'EXACT', 'SIMPLIFIED', 'BASIC', and 'BARE'; defaults to 'SIMPLIFIED'; - 'maxIter': maximum number of greedy steps; defaults to 1e2; - 'refinementRatio': ratio of training points to be exhausted before training set refinement; defaults to 0.2; - 'nTestPoints': number of test points; defaults to maxIter / refinementRatio; - 'nTrainPoints': number of starting training points; defaults to 1; - 'trainSetGenerator': training sample points generator; defaults to Chebyshev sampler within muBounds; - 'testSetGenerator': test sample points generator; defaults to uniform sampler within muBounds; - 'interpRcond': tolerance for interpolation via numpy.polyfit; defaults to None; - 'robustTol': tolerance for robust Pade' denominator management; defaults to 0. Defaults to empty dict. homogeneized: Whether to homogeneize Dirichlet BCs. Defaults to False. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. mu0: Default parameter. mus: Array of snapshot parameters. homogeneized: Whether to homogeneize Dirichlet BCs. approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList. parameterList: Recognized keys of approximant parameters: - 'POD': whether to compute POD of snapshots; - 'muBounds': list of bounds for parameter values; - 'basis': type of basis for interpolation; - 'Delta': difference between M and N in rational approximant; - 'greedyTol': uniform error tolerance for greedy algorithm; - 'errorEstimatorKind': kind of error estimator; - 'maxIter': maximum number of greedy steps; - 'refinementRatio': ratio of training points to be exhausted before training set refinement; - 'nTestPoints': number of test points; - 'nTrainPoints': number of starting training points; - 'trainSetGenerator': training sample points generator; - 'testSetGenerator': test sample points generator; - 'interpRcond': tolerance for interpolation via numpy.polyfit; - 'robustTol': tolerance for robust Pade' denominator management. extraApproxParameters: List of approxParameters keys in addition to mother class's. POD: whether to compute POD of snapshots. muBounds: list of bounds for parameter values. greedyTol: uniform error tolerance for greedy algorithm. errorEstimatorKind: kind of error estimator. maxIter: maximum number of greedy steps. refinementRatio: ratio of training points to be exhausted before training set refinement. nTrainPoints: number of test points. nTestPoints: number of starting training points. trainSetGenerator: training sample points generator. testSetGenerator: test sample points generator. interpRcond: Tolerance for interpolation via numpy.polyfit. robustTol: Tolerance for robust Pade' denominator management. samplingEngine: Sampling engine. uHF: High fidelity solution with wavenumber lastSolvedHF as numpy complex vector. lastSolvedHF: Wavenumber corresponding to last computed high fidelity solution. uApp: Last evaluated approximant as numpy complex vector. """ _allowedEstimatorKinds = ["EXACT", "SIMPLIFIED", "BASIC", "BARE"] def __init__(self, HFEngine:HFEng, mu0 : complex = 0., approxParameters : DictAny = {}, homogeneized : bool = False, verbosity : int = 10, timestamp : bool = True): - self.__preInit() - self.__addParametersToList(["polybasis", "Delta", "errorEstimatorKind", + self._preInit() + self._addParametersToList(["polybasis", "Delta", "errorEstimatorKind", "interpRcond", "robustTol"]) super().__init__(HFEngine = HFEngine, mu0 = mu0, approxParameters = approxParameters, homogeneized = homogeneized, verbosity = verbosity, timestamp = timestamp) if self.verbosity >= 7: verbosityDepth("INIT", "Computing Taylor blocks of system.", timestamp = self.timestamp) nAs = self.HFEngine.nAs - 1 nbs = max(self.HFEngine.nbs, (nAs + 1) * self.homogeneized) self.As = [self.HFEngine.A(self.mu0, j + 1) for j in range(nAs)] self.bs = [self.HFEngine.b(self.mu0, j, self.homogeneized) for j in range(nbs)] if self.verbosity >= 7: verbosityDepth("DEL", "Done computing Taylor blocks.", timestamp = self.timestamp) - self.__postInit() + self._postInit() @property def approxParameters(self): """ Value of approximant parameters. Its assignment may change robustTol. """ return self._approxParameters @approxParameters.setter def approxParameters(self, approxParams): approxParameters = purgeDict(approxParams, self.parameterList, dictname = self.name() + ".approxParameters", baselevel = 1) approxParametersCopy = purgeDict(approxParameters, ["polybasis", "Delta", "errorEstimatorKind", "interpRcond", "robustTol"], True, True, baselevel = 1) if "Delta" in list(approxParameters.keys()): self._Delta = approxParameters["Delta"] elif not hasattr(self, "_Delta") or self._Delta is None: self._Delta = 0 GenericApproximantLagrangeGreedy.approxParameters.fset(self, approxParametersCopy) keyList = list(approxParameters.keys()) self.Delta = self.Delta if "polybasis" in keyList: self.polybasis = approxParameters["polybasis"] elif not hasattr(self, "_polybasis") or self._polybasis is None: self.polybasis = "MONOMIAL" if "errorEstimatorKind" in keyList: self.errorEstimatorKind = approxParameters["errorEstimatorKind"] elif (not hasattr(self, "_errorEstimatorKind") or self.errorEstimatorKind is None): self.errorEstimatorKind = "SIMPLIFIED" if "interpRcond" in keyList: self.interpRcond = approxParameters["interpRcond"] elif not hasattr(self, "interpRcond") or self.interpRcond is None: self.interpRcond = None if "robustTol" in keyList: self.robustTol = approxParameters["robustTol"] elif not hasattr(self, "_robustTol") or self._robustTol is None: self.robustTol = 0 @property def polybasis(self): """Value of polybasis.""" return self._polybasis @polybasis.setter def polybasis(self, polybasis): try: polybasis = polybasis.upper().strip().replace(" ","") if polybasis not in polybases: raise RROMPyException("Sample type not recognized.") self._polybasis = polybasis except: RROMPyWarning(("Prescribed polybasis not recognized. Overriding " "to 'MONOMIAL'.")) self._polybasis = "MONOMIAL" self._approxParameters["polybasis"] = self.polybasis @property def Delta(self): """Value of Delta.""" return self._Delta @Delta.setter def Delta(self, Delta): if not np.isclose(Delta, np.floor(Delta)): raise RROMPyException("Delta must be an integer.") if Delta < 0: RROMPyWarning(("Error estimator unreliable for Delta < 0. " "Overloading of errorEstimator is suggested.")) else: Deltamin = (max(self.HFEngine.nbs, self.HFEngine.nAs * self.homogeneized) - 1 - 1 * (self.HFEngine.nAs > 1)) if Delta < Deltamin: RROMPyWarning(("Method may be unreliable for selected Delta. " "Suggested minimal value of Delta: {}.").format( Deltamin)) self._Delta = Delta self._approxParameters["Delta"] = self.Delta @property def errorEstimatorKind(self): """Value of errorEstimatorKind.""" return self._errorEstimatorKind @errorEstimatorKind.setter def errorEstimatorKind(self, errorEstimatorKind): errorEstimatorKind = errorEstimatorKind.upper() if errorEstimatorKind not in self._allowedEstimatorKinds: RROMPyWarning(("Error estimator kind not recognized. Overriding " "to 'SIMPLIFIED'.")) errorEstimatorKind = "SIMPLIFIED" self._errorEstimatorKind = errorEstimatorKind self._approxParameters["errorEstimatorKind"] = self.errorEstimatorKind @property def nTestPoints(self): """Value of nTestPoints.""" return self._nTestPoints @nTestPoints.setter def nTestPoints(self, nTestPoints): if nTestPoints <= np.abs(self.Delta): RROMPyWarning(("nTestPoints must be at least abs(Delta) + 1. " "Increasing value to abs(Delta) + 1.")) nTestPoints = np.abs(self.Delta) + 1 if not np.isclose(nTestPoints, np.int(nTestPoints)): raise RROMPyException("nTestPoints must be an integer.") nTestPoints = np.int(nTestPoints) if hasattr(self, "_nTestPoints") and self.nTestPoints is not None: nTestPointsold = self.nTestPoints else: nTestPointsold = -1 self._nTestPoints = nTestPoints self._approxParameters["nTestPoints"] = self.nTestPoints if nTestPointsold != self.nTestPoints: self.resetSamples() def errorEstimator(self, mus:List[np.complex]) -> List[np.complex]: """Standard residual-based error estimator.""" self.setupApprox() PM = self.trainedModel.data.P[:, -1] if np.any(np.isnan(PM)) or np.any(np.isinf(PM)): err = np.empty(len(mus)) err[:] = np.inf return err nAs = self.HFEngine.nAs - 1 nbs = max(self.HFEngine.nbs - 1, nAs * self.homogeneized) muRTest = self.radiusPade(mus) muRTrain = self.radiusPade(self.mus) nodalVals = np.prod(np.tile(muRTest.reshape(-1, 1), [1, self.S]) - muRTrain.reshape(1, -1), axis = 1) denVals = self.trainedModel.getQVal(mus) self.assembleReducedResidualBlocks(kind = self.errorEstimatorKind) vanderBase = np.polynomial.polynomial.polyvander(muRTest, max(nAs, nbs)).T radiusb0 = vanderBase[: nbs + 1, :] # 'ij,jk,ik->k', resbb, radiusb0, radiusb0.conj() b0resb0 = np.sum(self.trainedModel.data.resbb.dot(radiusb0) * radiusb0.conj(), axis = 0) RHSnorms = np.power(np.abs(b0resb0), .5) if self.errorEstimatorKind == "BARE": self.assembleReducedResidualGramian(self.trainedModel.data.projMat) pDom = self.trainedModel.data.P[:, -1] LL = pDom.conj().dot(self.trainedModel.data.gramian.dot(pDom)) Adiag = self.As[0].diagonal() LL = ((self.scaleFactor * np.linalg.norm(Adiag)) ** 2. / np.size(Adiag) * LL) elif self.errorEstimatorKind == "BASIC": pDom = self.trainedModel.data.P[:, -1] LL = pDom.conj().dot(self.trainedModel.data.resAA.dot(pDom)) else: vanderBase = vanderBase[: -1, :] delta = self.S - len(self.trainedModel.data.Q) nbsEff = max(0, nbs - delta) if self.errorEstimatorKind == "SIMPLIFIED": radiusA = np.tensordot(PM, vanderBase[: nAs, :], 0) if delta == 0: radiusb = (np.abs(self.trainedModel.data.Q[-1]) * radiusb0[: -1, :]) else: #if self.errorEstimatorKind == "EXACT": momentQ = np.zeros(nbsEff, dtype = np.complex) momentQu = np.zeros((self.S, nAs), dtype = np.complex) radiusbTen = np.zeros((nbsEff, nbsEff, len(mus)), dtype = np.complex) radiusATen = np.zeros((nAs, nAs, len(mus)), dtype = np.complex) if nbsEff > 0: momentQ[0] = self.trainedModel.data.Q[-1] radiusbTen[0, :, :] = vanderBase[: nbsEff, :] momentQu[:, 0] = self.trainedModel.data.P[:, -1] radiusATen[0, :, :] = vanderBase[: nAs, :] Qvals = self.trainedModel.getQVal(self.mus) for k in range(1, max(nAs, nbs * (nbsEff > 0))): Qvals = Qvals * muRTrain if k > delta and k < nbs: momentQ[k - delta] = self._fitinv.dot(Qvals) radiusbTen[k - delta, k :, :] = ( radiusbTen[0, : delta - k, :]) if k < nAs: momentQu[:, k] = Qvals * self._fitinv radiusATen[k, k :, :] = radiusATen[0, : - k, :] if self.POD and nAs > 1: momentQu[:, 1 :] = self.samplingEngine.RPOD.dot( momentQu[:, 1 :]) radiusA = np.tensordot(momentQu, radiusATen, 1) if nbsEff > 0: radiusb = np.tensordot(momentQ, radiusbTen, 1) if ((self.errorEstimatorKind == "SIMPLIFIED" and delta == 0) or (self.errorEstimatorKind == "EXACT" and nbsEff > 0)): # 'ij,jk,ik->k', resbb, radiusb, radiusb.conj() ff = np.sum(self.trainedModel.data.resbb[delta + 1 :, delta + 1 :]\ .dot(radiusb) * radiusb.conj(), axis = 0) # 'ijk,jkl,il->l', resAb, radiusA, radiusb.conj() Lf = np.sum(np.tensordot( self.trainedModel.data.resAb[delta :, :, :], radiusA, 2) * radiusb.conj(), axis = 0) else: ff, Lf = 0., 0. if self.errorEstimatorKind not in ["BARE", "BASIC"]: # 'ijkl,klt,ijt->t', resAA, radiusA, radiusA.conj() LL = np.sum(np.tensordot(self.trainedModel.data.resAA, radiusA, 2) * radiusA.conj(), axis = (0, 1)) jOpt = np.power(np.abs(ff - 2. * np.real(Lf) + LL), .5) return (polydomcoeff[self.polybasis](self.S - 1) * jOpt * np.abs(nodalVals / denVals) / RHSnorms) - def __setupDenominator(self): + def _setupDenominator(self): """Compute Pade' denominator.""" if self.verbosity >= 7: verbosityDepth("INIT", "Starting computation of denominator.", timestamp = self.timestamp) TS = polyvander[self.polybasis](self.radiusPade(self.mus),self.S - 1).T RHS = np.zeros(self.S) RHS[-1] = 1. fitOut = customFit(TS, RHS, full = True, rcond = self.interpRcond) if self.verbosity >= 2: condfit = fitOut[1][2][0] / fitOut[1][2][-1] verbosityDepth("MAIN", ("Fitting {} samples with degree {} " "through {}... Conditioning of system: " "{:.4e}.").format(self.S, self.S - 1, polyfitname[self.polybasis], condfit), timestamp = self.timestamp) if fitOut[1][1] < self.S: RROMPyWarning(("Polyfit is poorly conditioned. Starting " "preemptive termination of computation of " "approximant.")) Q = np.empty(max(self.N, 0) + 1, dtype = np.complex) P = np.empty((len(self.mus), max(self.M, 0) + 1), dtype = np.complex) Q[:] = np.nan P[:] = np.nan self.trainedModel.data.Q = np.copy(Q) self.trainedModel.data.P = np.copy(P) self.trainedModel.data.approxParameters = copy( self.approxParameters) if self.verbosity >= 7: verbosityDepth("DEL", "Aborting computation of denominator.", timestamp = self.timestamp) return self._fitinv = fitOut[0] while self.N > 0: Ghalf = (TS[: self.N + 1, :] * self._fitinv).T if self.POD: self.Ghalf = self.samplingEngine.RPOD.dot(Ghalf) ev, eV = self.findeveVGQR(2) else: self.Ghalf = self.samplingEngine.samples.dot(Ghalf) ev, eV = self.findeveVGQR(2) Nstable = np.sum(np.abs(ev) >= self.robustTol * np.linalg.norm(ev)) if self.N <= Nstable: break if self.verbosity >= 2: verbosityDepth("MAIN", ("Smallest {} eigenvalues below " "tolerance. Reducing N to {}.")\ .format(self.N - Nstable + 1, Nstable), timestamp = self.timestamp) self._N = Nstable if self.N <= 0: self._N = 0 eV = np.ones((1, 1)) if self.verbosity >= 7: verbosityDepth("DEL", "Done computing denominator.", timestamp = self.timestamp) return eV[:, 0] - def __setupNumerator(self): + def _setupNumerator(self): """Compute Pade' numerator.""" if self.verbosity >= 7: verbosityDepth("INIT", "Starting computation of numerator.", timestamp = self.timestamp) Qevaldiag = np.diag(self.trainedModel.getQVal(self.mus)) verb = self.trainedModel.verbosity self.trainedModel.verbosity = 0 mus_un, idx_un, cnt_un = np.unique(self.mus, return_inverse = True, return_counts = True) for j in range(len(mus_un)): if cnt_un[j] > 1: Q_loc = np.zeros((cnt_un[j], cnt_un[j]), dtype = np.complex) for der in range(1, cnt_un[j]): Qderj = (self.trainedModel.getQVal(mus_un[j], der) / fact(der)) Q_loc = Q_loc + Qderj * np.diag(np.ones(cnt_un[j] - der), k = - der) I = idx_un == j I = np.arange(len(self.mus))[I] I = np.ix_(I, I) Qevaldiag[I] = Qevaldiag[I] + Q_loc self.trainedModel.verbosity = verb while self.M >= 0: fitVander = polyvander[self.polybasis](self.radiusPade(self.mus), self.M) w = None if self.M == self.S - 1: w = "AUTO" fitOut = customFit(fitVander, Qevaldiag, full = True, w = w, rcond = self.interpRcond) if self.verbosity >= 2: condfit = fitOut[1][2][0] / fitOut[1][2][-1] verbosityDepth("MAIN", ("Fitting {} samples with degree {} " "through {}... Conditioning of " "system: {:.4e}.").format( self.S, self.M, polyfitname[self.polybasis], condfit), timestamp = self.timestamp) if fitOut[1][1] == self.M + 1: P = fitOut[0].T break RROMPyWarning(("Polyfit is poorly conditioned. Reducing M from {} " "to {}. Exact snapshot interpolation not " "guaranteed.").format(self.M, fitOut[1][1] - 1)) self._M = fitOut[1][1] - 1 if self.M < 0: raise RROMPyException(("Instability in computation of numerator. " "Aborting.")) return np.atleast_2d(P) def setupApprox(self, plotEst : bool = False): """ Compute Pade' interpolant. SVD-based robust eigenvalue management. """ if self.checkComputedApprox(): return if self.verbosity >= 5: verbosityDepth("INIT", "Setting up {}.". format(self.name()), timestamp = self.timestamp) self.computeScaleFactor() self.greedy(plotEst) self._M = self.S - 1 self._N = self.S - 1 if self.Delta < 0: self._M += self.Delta else: self._N -= self.Delta if self.trainedModel is None: self.trainedModel = tModel() self.trainedModel.verbosity = self.verbosity self.trainedModel.timestamp = self.timestamp data = TrainedModelData(self.trainedModel.name(), self.mu0, np.copy(self.samplingEngine.samples), self.HFEngine.rescalingExp) data.polytype = self.polybasis data.scaleFactor = self.scaleFactor data.mus = np.copy(self.mus) self.trainedModel.data = data else: self.trainedModel.data.projMat = np.copy( self.samplingEngine.samples) self.trainedModel.data.mus = np.copy(self.mus) if min(self.M, self.N) < 0: if self.verbosity >= 5: verbosityDepth("MAIN", "Minimal sample size not achieved.", timestamp = self.timestamp) Q = np.empty(max(self.N, 0) + 1, dtype = np.complex) P = np.empty((len(self.mus), max(self.M, 0) + 1), dtype = np.complex) Q[:] = np.nan P[:] = np.nan self.trainedModel.data.Q = np.copy(Q) self.trainedModel.data.P = np.copy(P) self.trainedModel.data.approxParameters = copy( self.approxParameters) if self.verbosity >= 5: verbosityDepth("DEL", "Aborting computation of approximant.", timestamp = self.timestamp) return if self.N > 0: - Q = self.__setupDenominator() + Q = self._setupDenominator() if Q is None: if self.verbosity >= 5: verbosityDepth("DEL", "Aborting computation of approximant.", timestamp = self.timestamp) return else: Q = np.ones((1,), dtype = np.complex) self.trainedModel.data.Q = np.copy(Q) - P = self.__setupNumerator() + P = self._setupNumerator() if self.POD: P = self.samplingEngine.RPOD.dot(P) self.trainedModel.data.P = np.copy(P) if self.verbosity >= 7: verbosityDepth("DEL", "Done computing numerator.", timestamp = self.timestamp) self.trainedModel.data.approxParameters = copy(self.approxParameters) if self.verbosity >= 5: verbosityDepth("DEL", "Done setting up approximant.", timestamp = self.timestamp) def assembleReducedResidualBlocks(self, kind : str = "EXACT"): """Build affine blocks of reduced linear system through projections.""" pMat = self.trainedModel.data.projMat scaling = self.trainedModel.data.scaleFactor self.assembleReducedResidualBlocksbb(self.bs, pMat, scaling) if kind in ["EXACT", "SIMPLIFIED"]: self.assembleReducedResidualBlocksAb(self.As, self.bs[1 :], pMat, scaling) if kind != "BARE": self.assembleReducedResidualBlocksAA(self.As, pMat, scaling, basic = (kind == "BASIC")) diff --git a/rrompy/reduction_methods/lagrange_greedy/approximant_lagrange_greedy_rb.py b/rrompy/reduction_methods/lagrange_greedy/approximant_lagrange_greedy_rb.py index de3a86b..b4a8720 100644 --- a/rrompy/reduction_methods/lagrange_greedy/approximant_lagrange_greedy_rb.py +++ b/rrompy/reduction_methods/lagrange_greedy/approximant_lagrange_greedy_rb.py @@ -1,250 +1,250 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import numpy as np from copy import copy from .generic_approximant_lagrange_greedy import ( GenericApproximantLagrangeGreedy) from rrompy.reduction_methods.lagrange import ApproximantLagrangeRB from rrompy.reduction_methods.trained_model import TrainedModelRB as tModel from rrompy.reduction_methods.trained_model import TrainedModelData from rrompy.utilities.base.types import DictAny, HFEng, List from rrompy.utilities.base import verbosityDepth from rrompy.utilities.exception_manager import RROMPyException __all__ = ['ApproximantLagrangeRBGreedy'] class ApproximantLagrangeRBGreedy(GenericApproximantLagrangeGreedy, ApproximantLagrangeRB): """ ROM greedy RB approximant computation for parametric problems. Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': whether to compute POD of snapshots; defaults to True; - 'muBounds': list of bounds for parameter values; defaults to [[0], [1]]; - 'greedyTol': uniform error tolerance for greedy algorithm; defaults to 1e-2; - 'maxIter': maximum number of greedy steps; defaults to 1e2; - 'refinementRatio': ratio of training points to be exhausted before training set refinement; defaults to 0.2; - 'nTestPoints': number of test points; defaults to maxIter / refinementRatio; - 'nTrainPoints': number of starting training points; defaults to 1; - 'trainSetGenerator': training sample points generator; defaults to Chebyshev sampler within muBounds; - 'testSetGenerator': test sample points generator; defaults to uniform sampler within muBounds. Defaults to empty dict. homogeneized: Whether to homogeneize Dirichlet BCs. Defaults to False. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. mu0: Default parameter. mus: Array of snapshot parameters. homogeneized: Whether to homogeneize Dirichlet BCs. approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList. parameterList: Recognized keys of approximant parameters: - 'POD': whether to compute POD of snapshots; - 'muBounds': list of bounds for parameter values; - 'greedyTol': uniform error tolerance for greedy algorithm; - 'maxIter': maximum number of greedy steps; - 'refinementRatio': ratio of training points to be exhausted before training set refinement; - 'nTestPoints': number of test points; - 'nTrainPoints': number of starting training points; - 'trainSetGenerator': training sample points generator; - 'testSetGenerator': test sample points generator; - 'robustTol': tolerance for robust Pade' denominator management. extraApproxParameters: List of approxParameters keys in addition to mother class's. POD: whether to compute POD of snapshots. muBounds: list of bounds for parameter values. greedyTol: uniform error tolerance for greedy algorithm. maxIter: maximum number of greedy steps. refinementRatio: ratio of training points to be exhausted before training set refinement. nTrainPoints: number of test points. nTestPoints: number of starting training points. trainSetGenerator: training sample points generator. testSetGenerator: test sample points generator. samplingEngine: Sampling engine. uHF: High fidelity solution with wavenumber lastSolvedHF as numpy complex vector. lastSolvedHF: Wavenumber corresponding to last computed high fidelity solution. uApp: Last evaluated approximant as numpy complex vector. As: List of sparse matrices (in CSC format) representing coefficients of linear system matrix wrt theta(mu). bs: List of numpy vectors representing coefficients of linear system RHS wrt theta(mu). thetaAs: List of callables representing coefficients of linear system matrix wrt mu. thetabs: List of callables representing coefficients of linear system RHS wrt mu. ARBs: List of sparse matrices (in CSC format) representing coefficients of compressed linear system matrix wrt theta(mu). bRBs: List of numpy vectors representing coefficients of compressed linear system RHS wrt theta(mu). """ def __init__(self, HFEngine:HFEng, mu0 : complex = 0., approxParameters : DictAny = {}, homogeneized : bool = False, verbosity : int = 10, timestamp : bool = True): - self.__preInit() + self._preInit() super().__init__(HFEngine = HFEngine, mu0 = mu0, approxParameters = approxParameters, homogeneized = homogeneized, verbosity = verbosity, timestamp = timestamp) if self.verbosity >= 10: verbosityDepth("INIT", "Computing affine blocks of system.", timestamp = self.timestamp) self.As = self.HFEngine.affineLinearSystemA(self.mu0) self.bs = self.HFEngine.affineLinearSystemb(self.mu0, self.homogeneized) if self.verbosity >= 10: verbosityDepth("DEL", "Done computing affine blocks.", timestamp = self.timestamp) - self.__postInit() + self._postInit() @property def R(self): """Value of R.""" return self._S @R.setter def R(self, R): raise RROMPyException(("R is used just to simplify inheritance, and " "its value cannot be changed from that of S.")) def errorEstimator(self, mus:List[np.complex]) -> List[np.complex]: """ Standard residual-based error estimator. Unreliable for unstable problems (inf-sup constant is missing). """ self.setupApprox() self.assembleReducedResidualBlocks() nmus = len(mus) nAs = self.trainedModel.data.resAA.shape[1] nbs = self.trainedModel.data.resbb.shape[0] thetaAs = self.trainedModel.data.thetaAs thetabs = self.trainedModel.data.thetabs radiusA = np.empty((self.S, nAs, nmus), dtype = np.complex) radiusb = np.empty((nbs, nmus), dtype = np.complex) verb = self.trainedModel.verbosity self.trainedModel.verbosity = 0 if verb >= 5: mustr = mus if nmus > 2: mustr = "[{} ..({}).. {}]".format(mus[0], nmus - 2, mus[-1]) verbosityDepth("INIT", ("Computing RB solution at mu = " "{}.").format(mustr), timestamp = self.timestamp) for j in range(nmus): mu = mus[j] uApp = self.getApproxReduced(mu) for i in range(nAs): radiusA[:, i, j] = eval(thetaAs[i]) * uApp for i in range(nbs): radiusb[i, j] = eval(thetabs[i]) if verb >= 5: verbosityDepth("DEL", "Done computing RB solution.", timestamp = self.timestamp) self.trainedModel.verbosity = verb # 'ij,jk,ik->k', resbb, radiusb, radiusb.conj() ff = np.sum(self.trainedModel.data.resbb.dot(radiusb) * radiusb.conj(), axis = 0) # 'ijk,jkl,il->l', resAb, radiusA, radiusb.conj() Lf = np.sum(np.tensordot(self.trainedModel.data.resAb, radiusA, 2) * radiusb.conj(), axis = 0) # 'ijkl,klt,ijt->t', resAA, radiusA, radiusA.conj() LL = np.sum(np.tensordot(self.trainedModel.data.resAA, radiusA, 2) * radiusA.conj(), axis = (0, 1)) return np.abs((LL - 2. * np.real(Lf) + ff) / ff) ** .5 def setupApprox(self, plotEst : bool = False): """Compute RB projection matrix.""" if self.checkComputedApprox(): return if self.verbosity >= 5: verbosityDepth("INIT", "Setting up {}.". format(self.name()), timestamp = self.timestamp) self.greedy(plotEst) if self.verbosity >= 7: verbosityDepth("INIT", "Computing projection matrix.", timestamp = self.timestamp) pMat = self.samplingEngine.samples if self.trainedModel is None: self.trainedModel = tModel() self.trainedModel.verbosity = self.verbosity self.trainedModel.timestamp = self.timestamp data = TrainedModelData(self.trainedModel.name(), self.mu0, np.copy(pMat), self.HFEngine.rescalingExp) data.thetaAs = self.HFEngine.affineWeightsA(self.mu0) data.thetabs = self.HFEngine.affineWeightsb(self.mu0, self.homogeneized) data.ARBs, data.bRBs = self.assembleReducedSystem(pMat) data.mus = np.copy(self.mus) self.trainedModel.data = data else: pMatOld = self.trainedModel.data.projMat Sold = pMatOld.shape[1] ARBs, bRBs = self.assembleReducedSystem(pMat[:, Sold :], pMatOld) self.trainedModel.data.ARBs = ARBs self.trainedModel.data.bRBs = bRBs self.trainedModel.data.projMat = np.copy(pMat) self.trainedModel.data.mus = np.copy(self.mus) if self.verbosity >= 7: verbosityDepth("DEL", "Done computing projection matrix.", timestamp = self.timestamp) self.trainedModel.data.approxParameters = copy(self.approxParameters) if self.verbosity >= 5: verbosityDepth("DEL", "Done setting up approximant.", timestamp = self.timestamp) def assembleReducedResidualBlocks(self): """Build affine blocks of RB linear system through projections.""" computeResbb = not hasattr(self.trainedModel.data, "resbb") computeResAb = (not hasattr(self.trainedModel.data, "resAb") or self.trainedModel.data.resAb.shape[1] != self.S) computeResAA = (not hasattr(self.trainedModel.data, "resAA") or self.trainedModel.data.resAA.shape[0] != self.S) if computeResbb or computeResAb or computeResAA: pMat = self.trainedModel.data.projMat if self.verbosity >= 7: verbosityDepth("INIT", "Projecting affine terms of residual.", timestamp = self.timestamp) if computeResbb: self.assembleReducedResidualBlocksbb(self.bs, pMat) if computeResAb: self.assembleReducedResidualBlocksAb(self.As, self.bs, pMat) if computeResAA: self.assembleReducedResidualBlocksAA(self.As, pMat) if self.verbosity >= 7: verbosityDepth("DEL", ("Done setting up affine decomposition " "of residual."), timestamp = self.timestamp) diff --git a/rrompy/reduction_methods/lagrange_greedy/generic_approximant_lagrange_greedy.py b/rrompy/reduction_methods/lagrange_greedy/generic_approximant_lagrange_greedy.py index bf244f5..e3a8796 100644 --- a/rrompy/reduction_methods/lagrange_greedy/generic_approximant_lagrange_greedy.py +++ b/rrompy/reduction_methods/lagrange_greedy/generic_approximant_lagrange_greedy.py @@ -1,721 +1,721 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from copy import deepcopy as copy import numpy as np from rrompy.reduction_methods.base.generic_approximant import ( GenericApproximant) from rrompy.reduction_methods.lagrange.generic_approximant_lagrange import ( GenericApproximantLagrange) from rrompy.utilities.base.types import Np1D, Np2D, DictAny, HFEng, Tuple, List from rrompy.utilities.base import purgeDict, verbosityDepth from rrompy.utilities.exception_manager import (RROMPyException, modeAssert, RROMPyWarning) __all__ = ['GenericApproximantLagrangeGreedy'] class GenericApproximantLagrangeGreedy(GenericApproximantLagrange): """ ROM greedy Lagrange interpolant computation for parametric problems (ABSTRACT). Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': whether to compute POD of snapshots; defaults to True; - 'muBounds': list of bounds for parameter values; defaults to [[0], [1]]; - 'greedyTol': uniform error tolerance for greedy algorithm; defaults to 1e-2; - 'interactive': whether to interactively terminate greedy algorithm; defaults to False; - 'maxIter': maximum number of greedy steps; defaults to 1e2; - 'refinementRatio': ratio of test points to be exhausted before test set refinement; defaults to 0.2; - 'nTestPoints': number of test points; defaults to maxIter / refinementRatio; - 'nTrainPoints': number of starting training points; defaults to 1; - 'trainSetGenerator': training sample points generator; defaults to Chebyshev sampler within muBounds; - 'testSetGenerator': test sample points generator; defaults to uniform sampler within muBounds. Defaults to empty dict. homogeneized: Whether to homogeneize Dirichlet BCs. Defaults to False. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. mu0: Default parameter. mus: Array of snapshot parameters. homogeneized: Whether to homogeneize Dirichlet BCs. approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList. parameterList: Recognized keys of approximant parameters: - 'POD': whether to compute POD of snapshots; - 'muBounds': list of bounds for parameter values; - 'greedyTol': uniform error tolerance for greedy algorithm; - 'interactive': whether to interactively terminate greedy algorithm; - 'maxIter': maximum number of greedy steps; - 'refinementRatio': ratio of test points to be exhausted before test set refinement; - 'nTestPoints': number of test points; - 'nTrainPoints': number of starting training points; - 'trainSetGenerator': training sample points generator; - 'testSetGenerator': test sample points generator. extraApproxParameters: List of approxParameters keys in addition to mother class's. POD: whether to compute POD of snapshots. muBounds: list of bounds for parameter values. greedyTol: uniform error tolerance for greedy algorithm. maxIter: maximum number of greedy steps. refinementRatio: ratio of training points to be exhausted before training set refinement. nTrainPoints: number of test points. nTestPoints: number of starting training points. trainSetGenerator: training sample points generator. testSetGenerator: test sample points generator. robustTol: tolerance for robust Pade' denominator management. samplingEngine: Sampling engine. uHF: High fidelity solution with wavenumber lastSolvedHF as numpy complex vector. lastSolvedHF: Wavenumber corresponding to last computed high fidelity solution. uApp: Last evaluated approximant as numpy complex vector. """ TOL_INSTABILITY = 1e-6 def __init__(self, HFEngine:HFEng, mu0 : complex = 0., approxParameters : DictAny = {}, homogeneized : bool = False, verbosity : int = 10, timestamp : bool = True): - self.__preInit() - self.__addParametersToList(["muBounds", "greedyTol", "interactive", + self._preInit() + self._addParametersToList(["muBounds", "greedyTol", "interactive", "maxIter", "refinementRatio", "nTestPoints", "nTrainPoints", "trainSetGenerator", "testSetGenerator"]) super(GenericApproximantLagrange, self).__init__( HFEngine = HFEngine, mu0 = mu0, approxParameters = approxParameters, homogeneized = homogeneized, verbosity = verbosity, timestamp = timestamp) - self.__postInit() + self._postInit() @property def approxParameters(self): """Value of approximant parameters. Its assignment may change S.""" return self._approxParameters @approxParameters.setter def approxParameters(self, approxParams): approxParameters = purgeDict(approxParams, self.parameterList, dictname = self.name() + ".approxParameters", baselevel = 1) approxParametersCopy = purgeDict(approxParameters, ["muBounds", "greedyTol", "interactive", "maxIter", "refinementRatio", "nTestPoints", "nTrainPoints", "trainSetGenerator", "testSetGenerator"], True, True, baselevel = 1) GenericApproximant.approxParameters.fset(self, approxParametersCopy) keyList = list(approxParameters.keys()) if "muBounds" in keyList: self.muBounds = approxParameters["muBounds"] elif not hasattr(self, "_muBounds") or self.muBounds is None: self.muBounds = [[0.], [1.]] if "greedyTol" in keyList: self.greedyTol = approxParameters["greedyTol"] elif not hasattr(self, "_greedyTol") or self.greedyTol is None: self.greedyTol = 1e-2 if "interactive" in keyList: self.interactive = approxParameters["interactive"] elif not hasattr(self, "interactive") or self.interactive is None: self.interactive = False if "maxIter" in keyList: self.maxIter = approxParameters["maxIter"] elif not hasattr(self, "_maxIter") or self.maxIter is None: self.maxIter = 1e2 if "refinementRatio" in keyList: self.refinementRatio = approxParameters["refinementRatio"] elif (not hasattr(self, "_refinementRatio") or self.refinementRatio is None): self.refinementRatio = 0.2 if "nTestPoints" in keyList: self.nTestPoints = approxParameters["nTestPoints"] elif (not hasattr(self, "_nTestPoints") or self.nTestPoints is None): self.nTestPoints = np.int(np.ceil(self.maxIter / self.refinementRatio)) if "nTrainPoints" in keyList: self.nTrainPoints = approxParameters["nTrainPoints"] elif not hasattr(self, "_nTrainPoints") or self.nTrainPoints is None: self.nTrainPoints = 1 if "trainSetGenerator" in keyList: self.trainSetGenerator = approxParameters["trainSetGenerator"] elif (not hasattr(self, "_trainSetGenerator") or self.trainSetGenerator is None): from rrompy.utilities.parameter_sampling import QuadratureSampler self.trainSetGenerator = QuadratureSampler(self.muBounds, "CHEBYSHEV") del QuadratureSampler if "testSetGenerator" in keyList: self.testSetGenerator = approxParameters["testSetGenerator"] elif (not hasattr(self, "_testSetGenerator") or self.testSetGenerator is None): from rrompy.utilities.parameter_sampling import QuadratureSampler self.testSetGenerator = QuadratureSampler(self.muBounds, "UNIFORM") del QuadratureSampler @property def S(self): """Value of S.""" if not hasattr(self, "_mus") or self.mus is None: return 0 return len(self.mus) @S.setter def S(self, S): raise RROMPyException(("S is used just to simplify inheritance, and " "its value cannot be changed.")) @property def mus(self): """Value of mus.""" return self._mus @mus.setter def mus(self, mus): self._mus = np.array(mus, dtype = np.complex) @property def muBounds(self): """Value of muBounds.""" return self._muBounds @muBounds.setter def muBounds(self, muBounds): if len(muBounds) != 2: raise RROMPyException("2 limits must be specified.") try: muBounds = muBounds.tolist() except: muBounds = list(muBounds) for j in range(2): try: len(muBounds[j]) except: muBounds[j] = np.array([muBounds[j]]) if len(muBounds[0]) != len(muBounds[1]): raise RROMPyException("The bounds must have the same length.") self._muBounds = muBounds @property def greedyTol(self): """Value of greedyTol.""" return self._greedyTol @greedyTol.setter def greedyTol(self, greedyTol): if greedyTol < 0: raise RROMPyException("greedyTol must be non-negative.") if hasattr(self, "_greedyTol") and self.greedyTol is not None: greedyTolold = self.greedyTol else: greedyTolold = -1 self._greedyTol = greedyTol self._approxParameters["greedyTol"] = self.greedyTol if greedyTolold != self.greedyTol: self.resetSamples() @property def maxIter(self): """Value of maxIter.""" return self._maxIter @maxIter.setter def maxIter(self, maxIter): if maxIter <= 0: raise RROMPyException("maxIter must be positive.") if hasattr(self, "_maxIter") and self.maxIter is not None: maxIterold = self.maxIter else: maxIterold = -1 self._maxIter = maxIter self._approxParameters["maxIter"] = self.maxIter if maxIterold != self.maxIter: self.resetSamples() @property def refinementRatio(self): """Value of refinementRatio.""" return self._refinementRatio @refinementRatio.setter def refinementRatio(self, refinementRatio): if refinementRatio <= 0. or refinementRatio > 1.: raise RROMPyException(("refinementRatio must be between 0 " "(excluded) and 1.")) if (hasattr(self, "_refinementRatio") and self.refinementRatio is not None): refinementRatioold = self.refinementRatio else: refinementRatioold = -1 self._refinementRatio = refinementRatio self._approxParameters["refinementRatio"] = self.refinementRatio if refinementRatioold != self.refinementRatio: self.resetSamples() @property def nTestPoints(self): """Value of nTestPoints.""" return self._nTestPoints @nTestPoints.setter def nTestPoints(self, nTestPoints): if nTestPoints <= 0: raise RROMPyException("nTestPoints must be positive.") if not np.isclose(nTestPoints, np.int(nTestPoints)): raise RROMPyException("nTestPoints must be an integer.") nTestPoints = np.int(nTestPoints) if hasattr(self, "_nTestPoints") and self.nTestPoints is not None: nTestPointsold = self.nTestPoints else: nTestPointsold = -1 self._nTestPoints = nTestPoints self._approxParameters["nTestPoints"] = self.nTestPoints if nTestPointsold != self.nTestPoints: self.resetSamples() @property def nTrainPoints(self): """Value of nTrainPoints.""" return self._nTrainPoints @nTrainPoints.setter def nTrainPoints(self, nTrainPoints): if nTrainPoints <= 1: raise RROMPyException("nTrainPoints must be greater than 1.") if not np.isclose(nTrainPoints, np.int(nTrainPoints)): raise RROMPyException("nTrainPoints must be an integer.") nTrainPoints = np.int(nTrainPoints) if (hasattr(self, "_nTrainPoints") and self.nTrainPoints is not None): nTrainPointsOld = self.nTrainPoints else: nTrainPointsOld = -1 self._nTrainPoints = nTrainPoints self._approxParameters["nTrainPoints"] = self.nTrainPoints if nTrainPointsOld != self.nTrainPoints: self.resetSamples() @property def trainSetGenerator(self): """Value of trainSetGenerator.""" return self._trainSetGenerator @trainSetGenerator.setter def trainSetGenerator(self, trainSetGenerator): if 'generatePoints' not in dir(trainSetGenerator): raise RROMPyException("trainSetGenerator type not recognized.") if (hasattr(self, '_trainSetGenerator') and self.trainSetGenerator is not None): trainSetGeneratorOld = self.trainSetGenerator self._trainSetGenerator = trainSetGenerator self._approxParameters["trainSetGenerator"] = self.trainSetGenerator if (not 'trainSetGeneratorOld' in locals() or trainSetGeneratorOld != self.trainSetGenerator): self.resetSamples() @property def testSetGenerator(self): """Value of testSetGenerator.""" return self._testSetGenerator @testSetGenerator.setter def testSetGenerator(self, testSetGenerator): if 'generatePoints' not in dir(testSetGenerator): raise RROMPyException("testSetGenerator type not recognized.") if (hasattr(self, '_testSetGenerator') and self.testSetGenerator is not None): testSetGeneratorOld = self.testSetGenerator self._testSetGenerator = testSetGenerator self._approxParameters["testSetGenerator"] = self.testSetGenerator if (not 'testSetGeneratorOld' in locals() or testSetGeneratorOld != self.testSetGenerator): self.resetSamples() def resetSamples(self): """Reset samples.""" super().resetSamples() self._mus = [] def initEstNormer(self): """Initialize estimator norm engine.""" if not hasattr(self, "estNormer"): from rrompy.hfengines.base import ProblemEngineBase as PEB self.estNormer = PEB() # L2 norm self.estNormer.V = self.HFEngine.V self.estNormer.buildEnergyNormForm() def errorEstimator(self, mus:List[np.complex]) -> List[np.complex]: """ Standard residual-based error estimator with explicit residual computation. """ self.setupApprox() nmus = len(mus) err = np.empty(nmus) if self.HFEngine.nbs == 1: RHS = self.getRHS(mus[0], homogeneized = self.homogeneized) RHSNorm = self.estNormer.norm(RHS) for j in range(nmus): res = self.getRes(mus[j], homogeneized = self.homogeneized) err[j] = self.estNormer.norm(res) / RHSNorm else: for j in range(nmus): res = self.getRes(mus[j], homogeneized = self.homogeneized) RHS = self.getRHS(mus[j], homogeneized = self.homogeneized) err[j] = self.estNormer.norm(res) / self.estNormer.norm(RHS) return np.abs(err) def getMaxErrorEstimator(self, mus, plot : bool = False)\ -> Tuple[Np1D, int, float]: """ Compute maximum of (and index of maximum of) error estimator over given parameters. """ errorEstTest = self.errorEstimator(mus) idxMaxEst = np.argmax(errorEstTest) maxEst = errorEstTest[idxMaxEst] if plot and not np.all(np.isinf(errorEstTest)): from matplotlib import pyplot as plt onemus = np.ones(self.S) plt.figure() plt.semilogy(np.real(mus), errorEstTest, 'k') plt.semilogy(np.real(mus[[0, -1]]), [self.greedyTol] * 2, 'r--') plt.semilogy(np.real(self.mus), 2. * self.greedyTol * onemus, '*m') plt.semilogy(np.real(mus[idxMaxEst]), maxEst, 'xr') plt.grid() plt.show() plt.close() return errorEstTest, idxMaxEst, maxEst def greedyNextSample(self, muidx:int, plotEst : bool = False)\ -> Tuple[Np1D, int, float, complex]: """Compute next greedy snapshot of solution map.""" modeAssert(self._mode, message = "Cannot add greedy sample.") mu = self.muTest[muidx] if self.verbosity >= 2: verbosityDepth("MAIN", ("Adding {}-th sample point at {} to " "training set.").format( self.samplingEngine.nsamples + 1, mu), timestamp = self.timestamp) self.mus = np.append(self.mus, mu) idxs = np.arange(len(self.muTest)) mask = np.ones_like(idxs, dtype = bool) mask[muidx] = False idxs = idxs[mask] self.muTest = self.muTest[idxs] self.samplingEngine.nextSample(mu, homogeneized = self.homogeneized) errorEstTest, muidx, maxErrorEst = self.getMaxErrorEstimator( self.muTest, plotEst) return errorEstTest, muidx, maxErrorEst, self.muTest[muidx] - def __preliminaryTraining(self): + def _preliminaryTraining(self): """Initialize starting snapshots of solution map.""" modeAssert(self._mode, message = "Cannot start greedy algorithm.") if self.samplingEngine.samples is not None: return if self.verbosity >= 2: verbosityDepth("INIT", "Starting computation of snapshots.", timestamp = self.timestamp) self.resetSamples() self.mus, _ = self.trainSetGenerator.generatePoints(self.nTrainPoints) muTestBase, _ = self.testSetGenerator.generatePoints(self.nTestPoints) proxVal = np.min(np.abs(muTestBase.reshape(-1, 1) - np.tile(self.mus.reshape(1, -1), [self.nTestPoints, 1])), axis = 1) proxMask = ~(proxVal < 1e-12 * np.abs(muTestBase[0] - muTestBase[-1])) self.muTest = np.empty(np.sum(proxMask) + 1, dtype = np.complex) self.muTest[:-1] = np.sort(muTestBase[proxMask]).flatten() self.muTest[-1] = self.mus[-1] self.mus = self.mus[:-1] for j in range(len(self.mus)): if self.verbosity >= 2: verbosityDepth("MAIN", ("Adding {}-th sample point at {} to training " "set.").format(self.samplingEngine.nsamples+ 1, self.mus[j]), timestamp = self.timestamp) self.samplingEngine.nextSample(self.mus[j], homogeneized = self.homogeneized) - def __enrichTestSet(self, nTest:int): + def _enrichTestSet(self, nTest:int): """Double number of elements of test set.""" muTestExtra, _ = self.testSetGenerator.generatePoints(2 * nTest) muGiven = np.append(self.mus, self.muTest).reshape(1, -1) proxVal = np.min(np.abs(muTestExtra.reshape(-1, 1) - np.tile(muGiven, [2 * nTest, 1])), axis = 1) proxMask = ~(proxVal < 1e-12 * np.abs(muTestExtra[0]-muTestExtra[-1])) muTestNew = np.empty(len(self.muTest) + np.sum(proxMask), dtype = np.complex) muTestNew[: len(self.muTest)] = self.muTest muTestNew[len(self.muTest) :] = muTestExtra[proxMask] self.muTest = np.sort(muTestNew) if self.verbosity >= 5: verbosityDepth("MAIN", "Enriching test set by {} elements.".format( np.sum(proxMask)), timestamp = self.timestamp) def greedy(self, plotEst : bool = False): """Compute greedy snapshots of solution map.""" modeAssert(self._mode, message = "Cannot start greedy algorithm.") if self.samplingEngine.samples is not None: return - self.__preliminaryTraining() + self._preliminaryTraining() nTest = self.nTestPoints errorEstTest, muidx, maxErrorEst, mu = self.greedyNextSample(-1, plotEst) if self.verbosity >= 2: verbosityDepth("MAIN", ("Uniform testing error estimate " "{:.4e}.").format(maxErrorEst), timestamp = self.timestamp) trainedModelOld = copy(self.trainedModel) while (self.samplingEngine.nsamples < self.maxIter and maxErrorEst > self.greedyTol): if (1. - self.refinementRatio) * nTest > len(self.muTest): - self.__enrichTestSet(nTest) + self._enrichTestSet(nTest) nTest = len(self.muTest) muTestOld, maxErrorEstOld = self.muTest, maxErrorEst errorEstTest, muidx, maxErrorEst, mu = self.greedyNextSample( muidx, plotEst) if self.verbosity >= 2: verbosityDepth("MAIN", ("Uniform testing error estimate " "{:.4e}.").format(maxErrorEst), timestamp = self.timestamp) if (np.isnan(maxErrorEst) or np.isinf(maxErrorEst) or maxErrorEstOld < maxErrorEst * self.TOL_INSTABILITY): RROMPyWarning(("Instability in a posteriori estimator. " "Starting preemptive greedy loop termination.")) maxErrorEst = maxErrorEstOld self.muTest = muTestOld self.mus = self.mus[:-1] self.samplingEngine.popSample() self.trainedModel.data = copy(trainedModelOld.data) break trainedModelOld.data = copy(self.trainedModel.data) if (self.interactive and maxErrorEst <= self.greedyTol): verbosityDepth("MAIN", ("Required tolerance {} achieved. Want " "to decrease greedyTol and continue? " "Y/N").format(self.greedyTol), timestamp = self.timestamp, end = "") increasemaxIter = input() if increasemaxIter.upper() == "Y": verbosityDepth("MAIN", "Reducing value of greedyTol...", timestamp = self.timestamp) while maxErrorEst <= self._greedyTol: self._greedyTol *= .5 if (self.interactive and self.samplingEngine.nsamples >= self.maxIter): verbosityDepth("MAIN", ("Maximum number of iterations {} " "reached. Want to increase maxIter " "and continue? Y/N").format( self.maxIter), timestamp = self.timestamp, end = "") increasemaxIter = input() if increasemaxIter.upper() == "Y": verbosityDepth("MAIN", "Doubling value of maxIter...", timestamp = self.timestamp) self._maxIter *= 2 if self.verbosity >= 2: verbosityDepth("DEL", ("Done computing snapshots (final snapshot " "count: {}).").format( self.samplingEngine.nsamples), timestamp = self.timestamp) def checkComputedApprox(self) -> bool: """ Check if setup of new approximant is not needed. Returns: True if new setup is not needed. False otherwise. """ return (super().checkComputedApprox() and self.S == self.trainedModel.data.projMat.shape[1]) def computeScaleFactor(self): """Compute parameter rescaling factor.""" modeAssert(self._mode, message = "Cannot compute rescaling factor.") self.scaleFactor= np.abs(np.power(self.trainSetGenerator.lims[0], self.HFEngine.rescalingExp) - np.power(self.trainSetGenerator.lims[1], self.HFEngine.rescalingExp)) / 2. def assembleReducedResidualGramian(self, pMat:Np2D): """ Build residual gramian of reduced linear system through projections. """ self.initEstNormer() if (not hasattr(self.trainedModel.data, "gramian") or self.trainedModel.data.gramian is None): gramian = self.estNormer.innerProduct(pMat, pMat) else: Sold = self.trainedModel.data.gramian.shape[0] if Sold > self.S: gramian = self.trainedModel.data.gramian[: self.S, : self.S] else: gramian = np.empty((self.S, self.S), dtype = np.complex) gramian[: Sold, : Sold] = self.trainedModel.data.gramian gramian[: Sold, Sold :] = ( self.estNormer.innerProduct(pMat[:, Sold :], pMat[:, : Sold])) gramian[Sold :, : Sold] = gramian[: Sold, Sold :].T.conj() gramian[Sold :, Sold :] = ( self.estNormer.innerProduct(pMat[:, Sold :], pMat[:, Sold :])) self.trainedModel.data.gramian = gramian def assembleReducedResidualBlocksbb(self, bs:List[Np1D], pMat:Np2D, scaling : float = 1.): """ Build blocks (of type bb) of reduced linear system through projections. """ self.initEstNormer() nbs = len(bs) if (not hasattr(self.trainedModel.data, "resbb") or self.trainedModel.data.resbb is None): resbb = np.empty((nbs, nbs), dtype = np.complex) for i in range(nbs): Mbi = scaling ** i * bs[i] resbb[i, i] = self.estNormer.innerProduct(Mbi, Mbi) for j in range(i): Mbj = scaling ** j * bs[j] resbb[i, j] = self.estNormer.innerProduct(Mbj, Mbi) for i in range(nbs): for j in range(i + 1, nbs): resbb[i, j] = resbb[j, i].conj() self.trainedModel.data.resbb = resbb def assembleReducedResidualBlocksAb(self, As:List[Np2D], bs:List[Np1D], pMat:Np2D, scaling : float = 1.): """ Build blocks (of type Ab) of reduced linear system through projections. """ self.initEstNormer() nAs = len(As) nbs = len(bs) if (not hasattr(self.trainedModel.data, "resAb") or self.trainedModel.data.resAb is None): resAb = np.empty((nbs, self.S, nAs), dtype = np.complex) for j in range(nAs): MAj = scaling ** (j + 1) * As[j].dot(pMat) for i in range(nbs): Mbi = scaling ** (i + 1) * bs[i] resAb[i, :, j] = self.estNormer.innerProduct(MAj, Mbi) else: Sold = self.trainedModel.data.resAb.shape[1] if Sold == self.S: return if Sold > self.S: resAb = self.trainedModel.data.resAb[:, : self.S, :] else: resAb = np.empty((nbs, self.S, nAs), dtype = np.complex) resAb[:, : Sold, :] = self.trainedModel.data.resAb for j in range(nAs): MAj = scaling ** (j + 1) * As[j].dot(pMat[:, Sold :]) for i in range(nbs): Mbi = scaling ** (i + 1) * bs[i] resAb[i, Sold :, j] = self.estNormer.innerProduct(MAj, Mbi) self.trainedModel.data.resAb = resAb def assembleReducedResidualBlocksAA(self, As:List[Np2D], pMat:Np2D, scaling : float = 1., basic : bool = False): """ Build blocks (of type AA) of reduced linear system through projections. """ self.initEstNormer() nAs = len(As) if (not hasattr(self.trainedModel.data, "resAA") or self.trainedModel.data.resAA is None): if basic: MAEnd = scaling ** nAs * As[-1].dot(pMat) resAA = self.estNormer.innerProduct(MAEnd, MAEnd) else: resAA = np.empty((self.S, nAs, self.S, nAs), dtype = np.complex) for i in range(nAs): MAi = scaling ** (i + 1) * As[i].dot(pMat) resAA[:, i, :, i] = self.estNormer.innerProduct(MAi, MAi) for j in range(i): MAj = scaling ** (j + 1) * As[j].dot(pMat) resAA[:, i, :, j] = self.estNormer.innerProduct(MAj, MAi) for i in range(nAs): for j in range(i + 1, nAs): resAA[:, i, :, j] = resAA[:, j, :, i].T.conj() else: Sold = self.trainedModel.data.resAA.shape[0] if Sold == self.S: return if Sold > self.S: if basic: resAA = self.trainedModel.data.resAA[: self.S, : self.S] else: resAA = self.trainedModel.data.resAA[: self.S, :, : self.S, :] else: if basic: resAA = np.empty((self.S, self.S), dtype = np.complex) resAA[: Sold, : Sold] = self.trainedModel.data.resAA MAi = scaling ** nAs * As[-1].dot(pMat) resAA[: Sold, Sold :] = ( self.estNormer.innerProduct(MAi[:, Sold :], MAi[:, : Sold])) resAA[Sold :, : Sold] = resAA[: Sold, Sold :].T.conj() resAA[Sold :, Sold :] = ( self.estNormer.innerProduct(MAi[:, Sold :], MAi[:, Sold :])) else: resAA = np.empty((self.S, nAs, self.S, nAs), dtype = np.complex) resAA[: Sold, :, : Sold, :] = self.trainedModel.data.resAA for i in range(nAs): MAi = scaling ** (i + 1) * As[i].dot(pMat) resAA[: Sold, i, Sold :, i] = ( self.estNormer.innerProduct(MAi[:, Sold :], MAi[:, : Sold])) resAA[Sold :, i, : Sold, i] = resAA[: Sold, i, Sold :, i].T.conj() resAA[Sold :, i, Sold :, i] = ( self.estNormer.innerProduct(MAi[:, Sold :], MAi[:, Sold :])) for j in range(i): MAj = scaling ** (j + 1) * As[j].dot(pMat) resAA[: Sold, i, Sold :, j] = ( self.estNormer.innerProduct(MAj[:, Sold :], MAi[:, : Sold])) resAA[Sold :, i, : Sold, j] = ( self.estNormer.innerProduct(MAj[:, : Sold], MAi[:, Sold :])) resAA[Sold :, i, Sold :, j] = ( self.estNormer.innerProduct(MAj[:, Sold :], MAi[:, Sold :])) for i in range(nAs): for j in range(i + 1, nAs): resAA[: Sold, i, Sold :, j] = ( resAA[Sold :, j, : Sold, i].T.conj()) resAA[Sold :, i, : Sold, j] = ( resAA[: Sold, j, Sold :, i].T.conj()) resAA[Sold :, i, Sold :, j] = ( resAA[Sold :, j, Sold :, i].T.conj()) self.trainedModel.data.resAA = resAA diff --git a/rrompy/reduction_methods/taylor/approximant_taylor_pade.py b/rrompy/reduction_methods/taylor/approximant_taylor_pade.py index 9ec752f..a291344 100644 --- a/rrompy/reduction_methods/taylor/approximant_taylor_pade.py +++ b/rrompy/reduction_methods/taylor/approximant_taylor_pade.py @@ -1,456 +1,456 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from copy import copy import numpy as np from rrompy.reduction_methods.base import checkRobustTolerance from rrompy.reduction_methods.trained_model import (TrainedModelData, TrainedModelPade as tModel) from .generic_approximant_taylor import GenericApproximantTaylor from rrompy.sampling.base.pod_engine import PODEngine from rrompy.utilities.base.types import Np1D, Np2D, Tuple, DictAny, HFEng from rrompy.utilities.base import verbosityDepth, purgeDict from rrompy.utilities.exception_manager import (RROMPyException, modeAssert, RROMPyWarning) __all__ = ['ApproximantTaylorPade'] class ApproximantTaylorPade(GenericApproximantTaylor): """ ROM single-point fast Pade' approximant computation for parametric problems. Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': whether to compute POD of snapshots; defaults to True; - 'rho': weight for computation of original Pade' approximant; defaults to np.inf, i.e. fast approximant; - 'M': degree of Pade' approximant numerator; defaults to 0; - 'N': degree of Pade' approximant denominator; defaults to 0; - 'E': total number of derivatives current approximant relies upon; defaults to 1; - 'robustTol': tolerance for robust Pade' denominator management; defaults to 0; - 'sampleType': label of sampling type; available values are: - 'ARNOLDI': orthogonalization of solution derivatives through Arnoldi algorithm; - 'KRYLOV': standard computation of solution derivatives. Defaults to 'KRYLOV'. Defaults to empty dict. homogeneized: Whether to homogeneize Dirichlet BCs. Defaults to False. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. mu0: Default parameter. homogeneized: Whether to homogeneize Dirichlet BCs. approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList. parameterList: Recognized keys of approximant parameters: - 'POD': whether to compute POD of snapshots; - 'rho': weight for computation of original Pade' approximant; - 'M': degree of Pade' approximant numerator; - 'N': degree of Pade' approximant denominator; - 'E': total number of derivatives current approximant relies upon; - 'robustTol': tolerance for robust Pade' denominator management; - 'sampleType': label of sampling type. POD: Whether to compute QR factorization of derivatives. rho: Weight of approximant. M: Numerator degree of approximant. N: Denominator degree of approximant. E: Number of solution derivatives over which current approximant is based upon. robustTol: Tolerance for robust Pade' denominator management. sampleType: Label of sampling type. initialHFData: HF problem initial data. uHF: High fidelity solution with wavenumber lastSolvedHF as numpy complex vector. lastSolvedHF: Wavenumber corresponding to last computed high fidelity solution. G: Square Numpy 2D vector of size (N+1) corresponding to Pade' denominator matrix (see paper). uApp: Last evaluated approximant as numpy complex vector. """ def __init__(self, HFEngine:HFEng, mu0 : complex = 0, approxParameters : DictAny = {}, homogeneized : bool = False, verbosity : int = 10, timestamp : bool = True): - self.__preInit() - self.__addParametersToList(["M", "N", "robustTol", "rho"]) + self._preInit() + self._addParametersToList(["M", "N", "robustTol", "rho"]) super().__init__(HFEngine = HFEngine, mu0 = mu0, approxParameters = approxParameters, homogeneized = homogeneized, verbosity = verbosity, timestamp = timestamp) - self.__postInit() + self._postInit() @property def approxParameters(self): """Value of approximant parameters.""" return self._approxParameters @approxParameters.setter def approxParameters(self, approxParams): approxParameters = purgeDict(approxParams, self.parameterList, dictname = self.name() + ".approxParameters", baselevel = 1) approxParametersCopy = purgeDict(approxParameters, ["M", "N", "robustTol", "rho"], True, True, baselevel = 1) keyList = list(approxParameters.keys()) if "rho" in keyList: self._rho = approxParameters["rho"] elif not hasattr(self, "_rho") or self.rho is None: self._rho = np.inf GenericApproximantTaylor.approxParameters.fset(self, approxParametersCopy) self.rho = self._rho if "robustTol" in keyList: self.robustTol = approxParameters["robustTol"] elif not hasattr(self, "_robustTol") or self._robustTol is None: self.robustTol = 0 self._ignoreParWarnings = True if "M" in keyList: self.M = approxParameters["M"] elif hasattr(self, "_M") and self._M is not None: self.M = self.M else: self.M = 0 del self._ignoreParWarnings if "N" in keyList: self.N = approxParameters["N"] elif hasattr(self, "_N") and self._N is not None: self.N = self.N else: self.N = 0 @property def rho(self): """Value of rho.""" return self._rho @rho.setter def rho(self, rho): self._rho = np.abs(rho) self._approxParameters["rho"] = self.rho @property def M(self): """Value of M. Its assignment may change E.""" return self._M @M.setter def M(self, M): if M < 0: raise RROMPyException("M must be non-negative.") self._M = M self._approxParameters["M"] = self.M if not hasattr(self, "_ignoreParWarnings"): self.checkMNE() @property def N(self): """Value of N. Its assignment may change E.""" return self._N @N.setter def N(self, N): if N < 0: raise RROMPyException("N must be non-negative.") self._N = N self._approxParameters["N"] = self.N if not hasattr(self, "_ignoreParWarnings"): self.checkMNE() def checkMNE(self): """Check consistency of M, N, and E.""" if not hasattr(self, "_E") or self.E is None: return M = self.M if (hasattr(self, "_M") and self.M is not None) else 0 N = self.N if (hasattr(self, "_N") and self.N is not None) else 0 msg = "max(M, N)" if self.rho == np.inf else "M + N" bound = eval(msg) if self.E < bound: RROMPyWarning(("Prescribed E is too small. Updating E to " "{}.").format(msg)) self.E = bound del M, N @property def robustTol(self): """Value of tolerance for robust Pade' denominator management.""" return self._robustTol @robustTol.setter def robustTol(self, robustTol): if robustTol < 0.: RROMPyWarning(("Overriding prescribed negative robustness " "tolerance to 0.")) robustTol = 0. self._robustTol = robustTol self._approxParameters["robustTol"] = self.robustTol @property def E(self): """Value of E.""" return self._E @E.setter def E(self, E): GenericApproximantTaylor.E.fset(self, E) self.checkMNE() - def __setupDenominator(self): + def _setupDenominator(self): """Compute Pade' denominator.""" if self.verbosity >= 7: verbosityDepth("INIT", "Starting computation of denominator.", timestamp = self.timestamp) while self.N > 0: if self.POD: ev, eV = self.findeveVGQR() else: ev, eV = self.findeveVGExplicit() newParameters = checkRobustTolerance(ev, self.E, self.robustTol) if not newParameters: break self.approxParameters = newParameters if self.N <= 0: eV = np.ones((1, 1)) if self.verbosity >= 7: verbosityDepth("DEL", "Done computing denominator.", timestamp = self.timestamp) return eV[::-1, 0] - def __setupNumerator(self): + def _setupNumerator(self): """Compute Pade' numerator.""" if self.verbosity >= 7: verbosityDepth("INIT", "Starting computation of numerator.", timestamp = self.timestamp) P = np.zeros((self.E + 1, self.M + 1), dtype = np.complex) for i in range(self.E + 1): l = min(self.M + 1, i + self.N + 1) P[i, i : l] = self.trainedModel.data.Q[: l - i] if self.verbosity >= 7: verbosityDepth("DEL", "Done computing numerator.", timestamp = self.timestamp) return self.rescaleParameter(P.T).T def setupApprox(self): """ Compute Pade' approximant. SVD-based robust eigenvalue management. """ if self.checkComputedApprox(): return if self.verbosity >= 5: verbosityDepth("INIT", "Setting up {}.". format(self.name()), timestamp = self.timestamp) self.computeDerivatives() if self.trainedModel is None: self.trainedModel = tModel() self.trainedModel.verbosity = self.verbosity self.trainedModel.timestamp = self.timestamp data = TrainedModelData(self.trainedModel.name(), self.mu0, self.samplingEngine.samples[:,:self.E + 1], self.HFEngine.rescalingExp) data.polytype = "MONOMIAL" self.trainedModel.data = data else: self.trainedModel.data.projMat = ( self.samplingEngine.samples[:, : self.E + 1]) if self.N > 0: - Q = self.__setupDenominator() + Q = self._setupDenominator() else: Q = np.ones(1, dtype = np.complex) self.trainedModel.data.Q = np.copy(Q) self.trainedModel.data.scaleFactor = self.scaleFactor - P = self.__setupNumerator() + P = self._setupNumerator() if self.sampleType == "ARNOLDI": P = self.samplingEngine.RArnoldi.dot(P) self.trainedModel.data.P = np.copy(P) self.trainedModel.data.approxParameters = copy(self.approxParameters) if self.verbosity >= 5: verbosityDepth("DEL", "Done setting up approximant.", timestamp = self.timestamp) def rescaleParameter(self, R:Np2D, A : Np2D = None, exponent : float = 1.) -> Np2D: """ Prepare parameter rescaling. Args: R: Matrix whose columns need rescaling. A(optional): Matrix whose diagonal defines scaling factor. If None, previous value of scaleFactor is used. Defaults to None. exponent(optional): Exponent of scaling factor in matrix diagonal. Defaults to 1. Returns: Rescaled matrix. """ modeAssert(self._mode, message = "Cannot compute rescaling factor.") if A is not None: aDiag = np.diag(A) scaleCoeffs = np.polyfit(np.arange(A.shape[1]), np.log(aDiag), 1) self.scaleFactor = np.exp(- scaleCoeffs[0] / exponent) return np.multiply(R, np.power(self.scaleFactor,np.arange(R.shape[1]))) def buildG(self): """Assemble Pade' denominator matrix.""" modeAssert(self._mode, message = "Cannot compute G matrix.") self.computeDerivatives() if self.verbosity >= 10: verbosityDepth("INIT", "Building gramian matrix.", timestamp = self.timestamp) if self.rho == np.inf: Nmin = self.E - self.N else: Nmin = self.M - self.N + 1 if self.sampleType == "KRYLOV": DerE = self.samplingEngine.samples[:, Nmin : self.E + 1] G = self.HFEngine.innerProduct(DerE, DerE) DerE = self.rescaleParameter(DerE, G, 2.) G = self.HFEngine.innerProduct(DerE, DerE) else: RArnE = self.samplingEngine.RArnoldi[: self.E + 1, Nmin : self.E + 1] RArnE = self.rescaleParameter(RArnE, RArnE[Nmin :, :]) G = RArnE.T.conj().dot(RArnE) if self.rho == np.inf: self.G = G else: Gbig = G self.G = np.zeros((self.N + 1, self.N + 1), dtype = np.complex) for k in range(self.E - self.M): self.G += self.rho ** (2 * k) * Gbig[k : k + self.N + 1, k : k + self.N + 1] if self.verbosity >= 10: verbosityDepth("DEL", "Done building gramian.", timestamp = self.timestamp) def findeveVGExplicit(self) -> Tuple[Np1D, Np2D]: """ Compute explicitly eigenvalues and eigenvectors of Pade' denominator matrix. """ modeAssert(self._mode, message = "Cannot solve eigenvalue problem.") self.buildG() if self.verbosity >= 7: verbosityDepth("INIT", "Solving eigenvalue problem for gramian matrix.", timestamp = self.timestamp) ev, eV = np.linalg.eigh(self.G) if self.verbosity >= 5: try: condev = ev[-1] / ev[0] except: condev = np.inf verbosityDepth("MAIN", ("Solved eigenvalue problem of size {} " "with condition number {:.4e}.").format( self.N + 1, condev), timestamp = self.timestamp) if self.verbosity >= 7: verbosityDepth("DEL", "Done solving eigenvalue problem.", timestamp = self.timestamp) return ev, eV def findeveVGQR(self) -> Tuple[Np1D, Np2D]: """ Compute eigenvalues and eigenvectors of Pade' denominator matrix through SVD of R factor. See ``Householder triangularization of a quasimatrix'', L.Trefethen, 2008 for QR algorithm. Returns: Eigenvalues in ascending order and corresponding eigenvector matrix. """ modeAssert(self._mode, message = "Cannot solve eigenvalue problem.") self.computeDerivatives() if self.rho == np.inf: Nmin = self.E - self.N else: Nmin = self.M - self.N + 1 if self.sampleType == "KRYLOV": A = copy(self.samplingEngine.samples[:, Nmin : self.E + 1]) self.PODEngine = PODEngine(self.HFEngine) if self.verbosity >= 10: verbosityDepth("INIT", "Orthogonalizing samples.", timestamp = self.timestamp) R = self.PODEngine.QRHouseholder(A, only_R = True) if self.verbosity >= 10: verbosityDepth("DEL", "Done orthogonalizing samples.", timestamp = self.timestamp) else: R = self.samplingEngine.RArnoldi[: self.E + 1, Nmin : self.E + 1] R = self.rescaleParameter(R, R[R.shape[0] - R.shape[1] :, :]) if self.rho == np.inf: if self.verbosity >= 7: verbosityDepth("INIT", ("Solving svd for square root of " "gramian matrix."), timestamp = self.timestamp) sizeI = R.shape[0] _, s, V = np.linalg.svd(R, full_matrices = False) else: if self.verbosity >= 10: verbosityDepth("INIT", ("Building matrix stack for square " "root of gramian."), timestamp = self.timestamp) Rtower = np.zeros((R.shape[0] * (self.E - self.M), self.N + 1), dtype = np.complex) for k in range(self.E - self.M): Rtower[k * R.shape[0] : (k + 1) * R.shape[0], :] = ( self.rho ** k * R[:, self.M - self.N + 1 + k : self.M + 1 + k]) if self.verbosity >= 10: verbosityDepth("DEL", "Done building matrix stack.", timestamp = self.timestamp) if self.verbosity >= 7: verbosityDepth("INIT", ("Solving svd for square root of " "gramian matrix."), timestamp = self.timestamp) sizeI = Rtower.shape[0] _, s, V = np.linalg.svd(Rtower, full_matrices = False) eV = V[::-1, :].T.conj() if self.verbosity >= 5: try: condev = s[0] / s[-1] except: condev = np.inf verbosityDepth("MAIN", ("Solved svd problem of size {} x {} with " "condition number {:.4e}.").format(sizeI, self.N + 1, condev), timestamp = self.timestamp) if self.verbosity >= 7: verbosityDepth("DEL", "Done solving eigenvalue problem.", timestamp = self.timestamp) return s[::-1], eV def radiusPade(self, mu:Np1D, mu0 : float = None) -> float: """ Compute translated radius to be plugged into Pade' approximant. Args: mu: Parameter(s) 1. mu0: Parameter(s) 2. If None, set to self.mu0. Returns: Translated radius to be plugged into Pade' approximant. """ return self.trainedModel.radiusPade(mu, mu0) def getResidues(self) -> Np1D: """ Obtain approximant residues. Returns: Matrix with residues as columns. """ return self.trainedModel.getResidues() diff --git a/rrompy/reduction_methods/taylor/approximant_taylor_rb.py b/rrompy/reduction_methods/taylor/approximant_taylor_rb.py index 2982977..2d8a7e8 100644 --- a/rrompy/reduction_methods/taylor/approximant_taylor_rb.py +++ b/rrompy/reduction_methods/taylor/approximant_taylor_rb.py @@ -1,229 +1,229 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from copy import copy import numpy as np from .generic_approximant_taylor import GenericApproximantTaylor from rrompy.reduction_methods.trained_model import TrainedModelRB as tModel from rrompy.reduction_methods.trained_model import TrainedModelData from rrompy.reduction_methods.base.rb_utils import projectAffineDecomposition from rrompy.sampling.base.pod_engine import PODEngine from rrompy.utilities.base.types import Np1D, Np2D, Tuple, List, DictAny, HFEng from rrompy.utilities.base import purgeDict, verbosityDepth from rrompy.utilities.exception_manager import RROMPyWarning __all__ = ['ApproximantTaylorRB'] class ApproximantTaylorRB(GenericApproximantTaylor): """ ROM single-point fast RB approximant computation for parametric problems with polynomial dependence up to degree 2. Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': whether to compute POD of snapshots; defaults to True; - 'R': rank for Galerkin projection; defaults to E + 1; - 'E': total number of derivatives current approximant relies upon; defaults to 1; - 'sampleType': label of sampling type; available values are: - 'ARNOLDI': orthogonalization of solution derivatives through Arnoldi algorithm; - 'KRYLOV': standard computation of solution derivatives. Defaults to 'KRYLOV'. Defaults to empty dict. homogeneized: Whether to homogeneize Dirichlet BCs. Defaults to False. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. mu0: Default parameter. homogeneized: Whether to homogeneize Dirichlet BCs. approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList. parameterList: Recognized keys of approximant parameters: - 'POD': whether to compute POD of snapshots; - 'R': rank for Galerkin projection; - 'E': total number of derivatives current approximant relies upon; - 'sampleType': label of sampling type. POD: Whether to compute QR factorization of derivatives. R: Rank for Galerkin projection. E: Number of solution derivatives over which current approximant is based upon. sampleType: Label of sampling type, i.e. 'KRYLOV'. uHF: High fidelity solution with wavenumber lastSolvedHF as numpy complex vector. lastSolvedHF: Wavenumber corresponding to last computed high fidelity solution. uApp: Last evaluated approximant as numpy complex vector. ARBs: List of sparse matrices (in CSC format) representing RB coefficients of linear system matrix wrt mu. bRBs: List of numpy vectors representing RB coefficients of linear system RHS wrt mu. """ def __init__(self, HFEngine:HFEng, mu0 : complex = 0, approxParameters : DictAny = {}, homogeneized : bool = False, verbosity : int = 10, timestamp : bool = True): - self.__preInit() - self.__addParametersToList(["R"]) + self._preInit() + self._addParametersToList(["R"]) super().__init__(HFEngine = HFEngine, mu0 = mu0, approxParameters = approxParameters, homogeneized = homogeneized, verbosity = verbosity, timestamp = timestamp) if self.verbosity >= 10: verbosityDepth("INIT", "Computing affine blocks of system.", timestamp = self.timestamp) if self.verbosity >= 10: verbosityDepth("DEL", "Done computing affine blocks.", timestamp = self.timestamp) - self.__postInit() + self._postInit() @property def approxParameters(self): """ Value of approximant parameters. Its assignment may change M, N and S. """ return self._approxParameters @approxParameters.setter def approxParameters(self, approxParams): approxParameters = purgeDict(approxParams, self.parameterList, dictname = self.name() + ".approxParameters", baselevel = 1) approxParametersCopy = purgeDict(approxParameters, ["R"], True, True, baselevel = 1) GenericApproximantTaylor.approxParameters.fset(self, approxParametersCopy) keyList = list(approxParameters.keys()) if "R" in keyList: self.R = approxParameters["R"] else: self.R = self.E + 1 @property def POD(self): """Value of POD.""" return self._POD @POD.setter def POD(self, POD): GenericApproximantTaylor.POD.fset(self, POD) if (hasattr(self, "_sampleType") and self.sampleType == "ARNOLDI" and not self.POD): RROMPyWarning(("Arnoldi sampling implicitly forces POD-type " "derivative management.")) @property def sampleType(self): """Value of sampleType.""" return self._sampleType @sampleType.setter def sampleType(self, sampleType): GenericApproximantTaylor.sampleType.fset(self, sampleType) if (hasattr(self, "_POD") and not self.POD and self.sampleType == "ARNOLDI"): RROMPyWarning(("Arnoldi sampling implicitly forces POD-type " "derivative management.")) @property def R(self): """Value of R. Its assignment may change S.""" return self._R @R.setter def R(self, R): if R < 0: raise RROMPyException("R must be non-negative.") self._R = R self._approxParameters["R"] = self.R if hasattr(self, "_E") and self.E + 1 < self.R: RROMPyWarning("Prescribed E is too small. Updating E to R - 1.") self.E = self.R - 1 def setupApprox(self): """Setup RB system.""" if self.checkComputedApprox(): return if self.verbosity >= 5: verbosityDepth("INIT", "Setting up {}.". format(self.name()), timestamp = self.timestamp) self.computeDerivatives() if self.verbosity >= 7: verbosityDepth("INIT", "Computing projection matrix.", timestamp = self.timestamp) if self.POD and not self.sampleType == "ARNOLDI": self.PODEngine = PODEngine(self.HFEngine) pMatQ, pMatR = self.PODEngine.QRHouseholder( self.samplingEngine.samples) if self.POD: if self.sampleType == "ARNOLDI": pMatR = self.samplingEngine.RArnoldi pMatQ = self.samplingEngine.samples U, _, _ = np.linalg.svd(pMatR[: self.E + 1, : self.E + 1]) pMat = pMatQ[:, : self.E + 1].dot(U[:, : self.R]) else: pMat = self.samplingEngine.samples[:, : self.R] if self.trainedModel is None: self.trainedModel = tModel() self.trainedModel.verbosity = self.verbosity self.trainedModel.timestamp = self.timestamp data = TrainedModelData(self.trainedModel.name(), self.mu0, np.copy(pMat), self.HFEngine.rescalingExp) data.thetaAs = self.HFEngine.affineWeightsA(self.mu0) data.thetabs = self.HFEngine.affineWeightsb(self.mu0, self.homogeneized) data.ARBs, data.bRBs = self.assembleReducedSystem(pMat) self.trainedModel.data = data else: pMatOld = self.trainedModel.data.projMat Sold = pMatOld.shape[1] ARBs, bRBs = self.assembleReducedSystem(pMat[:, Sold :], pMatOld) self.trainedModel.data.ARBs = ARBs self.trainedModel.data.bRBs = bRBs self.trainedModel.data.projMat = np.copy(pMat) if self.verbosity >= 7: verbosityDepth("DEL", "Done computing projection matrix.", timestamp = self.timestamp) self.trainedModel.data.approxParameters = copy(self.approxParameters) if self.verbosity >= 5: verbosityDepth("DEL", "Done setting up approximant.", timestamp = self.timestamp) def assembleReducedSystem(self, pMat : Np2D = None, pMatOld : Np2D = None)\ -> Tuple[List[Np2D], List[Np1D]]: """Build affine blocks of RB linear system through projections.""" if pMat is None: self.setupApprox() ARBs = self.trainedModel.data.ARBs bRBs = self.trainedModel.data.bRBs else: if self.verbosity >= 10: verbosityDepth("INIT", "Projecting affine terms of HF model.", timestamp = self.timestamp) As = self.HFEngine.affineLinearSystemA(self.mu0) bs = self.HFEngine.affineLinearSystemb(self.mu0, self.homogeneized) ARBsOld = None if pMatOld is None else self.trainedModel.data.ARBs bRBsOld = None if pMatOld is None else self.trainedModel.data.bRBs ARBs, bRBs = projectAffineDecomposition(As, bs, pMat, ARBsOld, bRBsOld, pMatOld) if self.verbosity >= 10: verbosityDepth("DEL", "Done projecting affine terms.", timestamp = self.timestamp) return ARBs, bRBs diff --git a/rrompy/reduction_methods/taylor/generic_approximant_taylor.py b/rrompy/reduction_methods/taylor/generic_approximant_taylor.py index 9925d1f..33658fa 100644 --- a/rrompy/reduction_methods/taylor/generic_approximant_taylor.py +++ b/rrompy/reduction_methods/taylor/generic_approximant_taylor.py @@ -1,185 +1,185 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import numpy as np from rrompy.reduction_methods.base.generic_approximant import ( GenericApproximant) from rrompy.utilities.base.types import DictAny, HFEng from rrompy.utilities.base import purgeDict, verbosityDepth from rrompy.utilities.exception_manager import (RROMPyException, modeAssert, RROMPyWarning) __all__ = ['GenericApproximantTaylor'] class GenericApproximantTaylor(GenericApproximant): """ ROM single-point approximant computation for parametric problems (ABSTRACT). Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': whether to compute POD of snapshots; defaults to True; - 'E': total number of derivatives current approximant relies upon; defaults to 1; - 'sampleType': label of sampling type; available values are: - 'ARNOLDI': orthogonalization of solution derivatives through Arnoldi algorithm; - 'KRYLOV': standard computation of solution derivatives. Defaults to 'KRYLOV'. Defaults to empty dict. homogeneized: Whether to homogeneize Dirichlet BCs. Defaults to False. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. mu0: Default parameter. homogeneized: Whether to homogeneize Dirichlet BCs. approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList. parameterList: Recognized keys of approximant parameters: - 'POD': whether to compute POD of snapshots; - 'E': total number of derivatives current approximant relies upon; - 'sampleType': label of sampling type. POD: Whether to compute QR factorization of derivatives. E: Number of solution derivatives over which current approximant is based upon. sampleType: Label of sampling type. initialHFData: HF problem initial data. samplingEngine: Sampling engine. uHF: High fidelity solution with wavenumber lastSolvedHF as numpy complex vector. lastSolvedHF: Wavenumber corresponding to last computed high fidelity solution. uApp: Last evaluated approximant as numpy complex vector. """ def __init__(self, HFEngine:HFEng, mu0 : complex = 0, approxParameters : DictAny = {}, homogeneized : bool = False, verbosity : int = 10, timestamp : bool = True): - self.__preInit() - self.__addParametersToList(["E", "sampleType"]) + self._preInit() + self._addParametersToList(["E", "sampleType"]) super().__init__(HFEngine = HFEngine, mu0 = mu0, approxParameters = approxParameters, homogeneized = homogeneized, verbosity = verbosity, timestamp = timestamp) - self.__postInit() + self._postInit() def setupSampling(self): """Setup sampling engine.""" modeAssert(self._mode, message = "Cannot setup sampling engine.") if not hasattr(self, "_sampleType"): return if self.sampleType == "ARNOLDI": from rrompy.sampling.linear_problem.sampling_engine_arnoldi \ import SamplingEngineArnoldi super().setupSampling(SamplingEngineArnoldi) elif self.sampleType == "KRYLOV": from rrompy.sampling.linear_problem.sampling_engine_krylov \ import SamplingEngineKrylov super().setupSampling(SamplingEngineKrylov) else: raise RROMPyException("Sample type not recognized.") @property def approxParameters(self): """Value of approximant parameters. Its assignment may change E.""" return self._approxParameters @approxParameters.setter def approxParameters(self, approxParams): approxParameters = purgeDict(approxParams, self.parameterList, dictname = self.name() + ".approxParameters", baselevel = 1) approxParametersCopy = purgeDict(approxParameters, ["E", "sampleType"], True, True, baselevel = 1) GenericApproximant.approxParameters.fset(self, approxParametersCopy) keyList = list(approxParameters.keys()) if "E" in keyList: self.E = approxParameters["E"] elif hasattr(self, "_E") and self._E is not None: self.E = self.E else: self.E = 1 if "sampleType" in keyList: self.sampleType = approxParameters["sampleType"] elif not hasattr(self, "_sampleType") or self._sampleType is None: self.sampleType = "KRYLOV" @property def E(self): """Value of E.""" return self._E @E.setter def E(self, E): if E < 0: raise RROMPyException("E must be non-negative.") self._E = E self._approxParameters["E"] = self.E @property def sampleType(self): """Value of sampleType.""" return self._sampleType @sampleType.setter def sampleType(self, sampleType): if hasattr(self, "_sampleType") and self._sampleType is not None: sampleTypeOld = self.sampleType else: sampleTypeOld = -1 try: sampleType = sampleType.upper().strip().replace(" ","") if sampleType not in ["ARNOLDI", "KRYLOV"]: raise RROMPyException("Sample type not recognized.") self._sampleType = sampleType except: RROMPyWarning(("Prescribed sampleType not recognized. Overriding " "to 'KRYLOV'.")) self._sampleType = "KRYLOV" self._approxParameters["sampleType"] = self.sampleType if sampleTypeOld != self.sampleType: self.resetSamples() def computeDerivatives(self): """Compute derivatives of solution map starting from order 0.""" modeAssert(self._mode, message = "Cannot start derivative computation.") if self.samplingEngine.nsamples <= self.E: if self.verbosity >= 5: verbosityDepth("INIT", "Starting computation of derivatives.", timestamp = self.timestamp) self.samplingEngine.iterSample(self.mu0, self.E + 1, homogeneized = self.homogeneized) if self.verbosity >= 5: verbosityDepth("DEL", "Done computing derivatives.", timestamp = self.timestamp) def normApprox(self, mu:complex, homogeneized : bool = False) -> float: """ Compute norm of approximant at arbitrary parameter. Args: mu: Target parameter. homogeneized(optional): Whether to remove Dirichlet BC. Defaults to False. Returns: Target norm of approximant. """ if self.sampleType != "ARNOLDI" or self.homogeneized != homogeneized: return super().normApprox(mu, homogeneized) return np.linalg.norm(self.getApproxReduced(mu, homogeneized)) diff --git a/rrompy/sampling/linear_problem/sampling_engine_lagrange.py b/rrompy/sampling/linear_problem/sampling_engine_lagrange.py index 45cfe11..fde4253 100644 --- a/rrompy/sampling/linear_problem/sampling_engine_lagrange.py +++ b/rrompy/sampling/linear_problem/sampling_engine_lagrange.py @@ -1,93 +1,93 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import numpy as np from rrompy.sampling.base.sampling_engine_base import SamplingEngineBase from rrompy.utilities.base.types import Np1D, Np2D from rrompy.utilities.base import verbosityDepth from rrompy.utilities.exception_manager import RROMPyException __all__ = ['SamplingEngineLagrange'] class SamplingEngineLagrange(SamplingEngineBase): """HERE""" nameBase = 1 def preprocesssamples(self, idxs:Np1D): if self.samples is None: return return self.samples[:, idxs] def postprocessu(self, u:Np1D, overwrite : bool = False): return u - def __getSampleConcurrence(self, mu:complex, previous:Np1D, + def _getSampleConcurrence(self, mu:complex, previous:Np1D, homogeneized : bool = False) -> Np1D: samplesOld = self.preprocesssamples(previous) RHS = self.HFEngine.b(mu, len(previous), homogeneized = homogeneized) for i in range(1, len(previous) + 1): RHS -= self.HFEngine.A(mu, i).dot(samplesOld[:, - i]) return self.solveLS(mu, RHS = RHS, homogeneized = homogeneized) def nextSample(self, mu:complex, overwrite : bool = False, homogeneized : bool = False) -> Np1D: ns = self.nsamples muidxs = np.nonzero(self.mus[:ns] == mu)[0] if len(muidxs) > 0: - u = self.__getSampleConcurrence(mu, np.sort(muidxs), homogeneized) + u = self._getSampleConcurrence(mu, np.sort(muidxs), homogeneized) else: u = self.solveLS(mu, homogeneized = homogeneized) u = self.postprocessu(u, overwrite = overwrite) if overwrite: self.samples[:, ns] = u self.mus[ns] = mu else: if ns == 0: self.samples = u[:, None] else: self.samples = np.hstack((self.samples, u[:, None])) self.mus = self.mus + [mu] self.nsamples += 1 return u def iterSample(self, mus:Np1D, homogeneized : bool = False) -> Np2D: if self.verbosity >= 5: verbosityDepth("INIT", "Starting sampling iterations.", timestamp = self.timestamp) n = mus.size if n <= 0: raise RROMPyException(("Number of samples must be positive.")) self.resetHistory() if self.verbosity >= 7: verbosityDepth("MAIN", "Computing sample {}/{}.".format(1, n), timestamp = self.timestamp) u = self.nextSample(mus[0], homogeneized = homogeneized) if n > 1: self.preallocateSamples(u, mus[0], n) for j in range(1, n): if self.verbosity >= 7: verbosityDepth("MAIN", "Computing sample {}/{}.".format(j + 1, n), timestamp = self.timestamp) self.nextSample(mus[j], overwrite = True, homogeneized = homogeneized) if self.verbosity >= 5: verbosityDepth("DEL", "Finished sampling iterations.", timestamp = self.timestamp) return self.samples diff --git a/setup.py b/setup.py index f9a2f00..87e8526 100644 --- a/setup.py +++ b/setup.py @@ -1,52 +1,52 @@ # Copyright (C) 2015-2018 by the RBniCS authors # # This file is part of RBniCS. # # RBniCS is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RBniCS is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RBniCS. If not, see . # import os from setuptools import find_packages, setup rrompy_directory = os.path.abspath(os.path.dirname(os.path.realpath(__file__))) #rrompy_directory = os.path.join(rrompy_directory, 'rrompy') setup(name="RROMPy", description="Rational reduced order modelling in Python", long_description="Rational reduced order modelling in Python", author="Davide Pradovera", author_email="davide.pradovera@epfl.ch", - version="1.1", + version="1.1.1", license="GNU Library or Lesser General Public License (LGPL)", classifiers=[ "Development Status :: 1 - Planning" "Intended Audience :: Developers", "Intended Audience :: Science/Research", "Programming Language :: Python :: 3", "Programming Language :: Python :: 3.4", "Programming Language :: Python :: 3.5", "Programming Language :: Python :: 3.6", "License :: OSI Approved :: GNU Library or Lesser General Public License (LGPL)", "Topic :: Scientific/Engineering :: Mathematics", "Topic :: Software Development :: Libraries :: Python Modules", ], packages=find_packages(rrompy_directory), setup_requires=[ "pytest-runner" ], tests_require=[ "pytest" ], zip_safe=False )