diff --git a/rrompy/reduction_methods/base/generic_approximant.py b/rrompy/reduction_methods/base/generic_approximant.py index 808ea63..43f6e12 100644 --- a/rrompy/reduction_methods/base/generic_approximant.py +++ b/rrompy/reduction_methods/base/generic_approximant.py @@ -1,908 +1,907 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from abc import abstractmethod import numpy as np from itertools import product as iterprod from copy import deepcopy as copy from os import remove as osrm from rrompy.sampling.standard import (SamplingEngineStandard, SamplingEngineStandardPOD) from rrompy.utilities.base.types import (Np1D, DictAny, HFEng, List, Tuple, ListAny, strLst, paramVal, paramList, sampList) from rrompy.utilities.base import (purgeDict, verbosityManager as vbMng, getNewFilename) from rrompy.utilities.exception_manager import (RROMPyException, RROMPyAssert, RROMPy_READY, RROMPy_FRAGILE) from rrompy.utilities.base import pickleDump, pickleLoad from rrompy.parameter import (emptyParameterList, checkParameter, checkParameterList) from rrompy.sampling import sampleList, emptySampleList __all__ = ['GenericApproximant'] def addNormFieldToClass(self, fieldName): def objFunc(self, mu:paramList, *args, **kwargs) -> Np1D: uV = getattr(self.__class__, "get" + fieldName)(self, mu) kwargs["is_state"] = False val = self.HFEngine.norm(uV, *args, **kwargs) return val setattr(self.__class__, "norm" + fieldName, objFunc) def addNormDualFieldToClass(self, fieldName): def objFunc(self, mu:paramList, *args, **kwargs) -> Np1D: uV = getattr(self.__class__, "get" + fieldName)(self, mu) kwargs["is_state"] = True if "dual" not in kwargs.keys(): kwargs["dual"] = True val = self.HFEngine.norm(uV, *args, **kwargs) return val setattr(self.__class__, "norm" + fieldName, objFunc) def addPlotFieldToClass(self, fieldName): def objFunc(self, mu:paramList, *args, **kwargs): uV = getattr(self.__class__, "get" + fieldName)(self, mu) kwargsCopy = copy(kwargs) filesOut = [] for j, u in enumerate(uV): if "name" in kwargs.keys(): kwargsCopy["name"] = kwargs["name"] + str(j) filesOut += [self.HFEngine.plot(u, *args, **kwargsCopy)] if filesOut[0] is None: return None return filesOut setattr(self.__class__, "plot" + fieldName, objFunc) def addPlotDualFieldToClass(self, fieldName): def objFunc(self, mu:paramList, *args, **kwargs): uV = getattr(self.__class__, "get" + fieldName)(self, mu) kwargsCopy = copy(kwargs) filesOut = [] for j, u in enumerate(uV): if "name" in kwargs.keys(): kwargsCopy["name"] = kwargs["name"] + str(j) filesOut += [self.HFEngine.plot(u, *args, **kwargsCopy)] if filesOut[0] is None: return None return filesOut setattr(self.__class__, "plot" + fieldName, objFunc) def addOutParaviewFieldToClass(self, fieldName): def objFunc(self, mu:paramVal, *args, **kwargs): if not hasattr(self.HFEngine, "outParaview"): raise RROMPyException(("High fidelity engine cannot output to " "Paraview.")) uV = getattr(self.__class__, "get" + fieldName)(self, mu) kwargsCopy = copy(kwargs) filesOut = [] for j, u in enumerate(uV): if "name" in kwargs.keys(): kwargsCopy["name"] = kwargs["name"] + str(j) filesOut += [self.HFEngine.outParaview(u, *args, **kwargsCopy)] if filesOut[0] is None: return None return filesOut setattr(self.__class__, "outParaview" + fieldName, objFunc) def addOutParaviewTimeDomainFieldToClass(self, fieldName): def objFunc(self, mu:paramVal, *args, **kwargs): if not hasattr(self.HFEngine, "outParaviewTimeDomain"): raise RROMPyException(("High fidelity engine cannot output to " "Paraview.")) uV = getattr(self.__class__, "get" + fieldName)(self, mu) omega = args.pop(0) if len(args) > 0 else np.real(mu) kwargsCopy = copy(kwargs) filesOut = [] for j, u in enumerate(uV): if "name" in kwargs.keys(): kwargsCopy["name"] = kwargs["name"] + str(j) filesOut += [self.HFEngine.outParaviewTimeDomain(u, omega, *args, **kwargsCopy)] if filesOut[0] is None: return None return filesOut setattr(self.__class__, "outParaviewTimeDomain" + fieldName, objFunc) def getTrainedModelClass(name): from importlib import import_module as im try: return getattr(im("rrompy.reduction_methods.trained_model"), name) except: raise RROMPyException("Trained model name not recognized.") class GenericApproximant: """ ABSTRACT ROM approximant computation for parametric problems. Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': whether to compute POD of snapshots; defaults to True; - 'S': total number of samples current approximant relies upon. Defaults to empty dict. approx_state(optional): Whether to approximate state. Defaults to False. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. trainedModel: Trained model evaluator. mu0: Default parameter. approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList{Soft,Critical}. parameterListSoft: Recognized keys of soft approximant parameters: - 'POD': whether to compute POD of snapshots. parameterListCritical: Recognized keys of critical approximant parameters: - 'S': total number of samples current approximant relies upon. approx_state: Whether to approximate state. verbosity: Verbosity level. POD: Whether to compute POD of snapshots. S: Number of solution snapshots over which current approximant is based upon. samplingEngine: Sampling engine. uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as sampleList. lastSolvedHF: Parameter(s) corresponding to last computed high fidelity solution(s) as parameterList. uApproxReduced: Reduced approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApproxReduced: Parameter(s) corresponding to last computed reduced approximate solution(s) as parameterList. uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApprox: Parameter(s) corresponding to last computed approximate solution(s) as parameterList. """ __all__ += [ftype + dtype for ftype, dtype in iterprod( ["norm", "plot", "outParaview", "outParaviewTimeDomain"], ["HF", "RHS", "Approx", "Res", "Err"])] def __init__(self, HFEngine:HFEng, mu0 : paramVal = None, approxParameters : DictAny = {}, approx_state : bool = False, verbosity : int = 10, timestamp : bool = True): self._preInit() self._mode = RROMPy_READY self.approx_state = approx_state self.verbosity = verbosity self.timestamp = timestamp vbMng(self, "INIT", "Initializing engine of type {}.".format(self.name()), 10) self._HFEngine = HFEngine self.trainedModel = None self.lastSolvedHF = emptyParameterList() self.uHF = emptySampleList() self._addParametersToList(["POD"], [True], ["S"], [1]) if mu0 is None: if hasattr(self.HFEngine, "mu0"): self.mu0 = checkParameter(self.HFEngine.mu0) else: raise RROMPyException(("Center of approximation cannot be " "inferred from HF engine. Parameter " "required")) else: self.mu0 = checkParameter(mu0, self.HFEngine.npar) self.resetSamples() self.approxParameters = approxParameters self._postInit() ### add norm{HF,Err} methods """ Compute norm of * at arbitrary parameter. Args: mu: Target parameter. Returns: Target norm of *. """ for objName in ["HF", "Err"]: addNormFieldToClass(self, objName) ### add norm{RHS,Res} methods """ Compute norm of * at arbitrary parameter. Args: mu: Target parameter. Returns: Target norm of *. """ for objName in ["RHS", "Res"]: addNormDualFieldToClass(self, objName) ### add plot{HF,Approx,Err} methods """ Do some nice plots of * at arbitrary parameter. Args: mu: Target parameter. name(optional): Name to be shown as title of the plots. Defaults to 'u'. what(optional): Which plots to do. If list, can contain 'ABS', 'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard 'ALL'. Defaults to 'ALL'. save(optional): Where to save plot(s). Defaults to None, i.e. no saving. saveFormat(optional): Format for saved plot(s). Defaults to "eps". saveDPI(optional): DPI for saved plot(s). Defaults to 100. show(optional): Whether to show figure. Defaults to True. figspecs(optional key args): Optional arguments for matplotlib figure creation. """ for objName in ["HF", "Approx", "Err"]: addPlotFieldToClass(self, objName) ### add plot{RHS,Res} methods """ Do some nice plots of * at arbitrary parameter. Args: mu: Target parameter. name(optional): Name to be shown as title of the plots. Defaults to 'u'. what(optional): Which plots to do. If list, can contain 'ABS', 'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard 'ALL'. Defaults to 'ALL'. save(optional): Where to save plot(s). Defaults to None, i.e. no saving. saveFormat(optional): Format for saved plot(s). Defaults to "eps". saveDPI(optional): DPI for saved plot(s). Defaults to 100. show(optional): Whether to show figure. Defaults to True. figspecs(optional key args): Optional arguments for matplotlib figure creation. """ for objName in ["RHS", "Res"]: addPlotDualFieldToClass(self, objName) ### add outParaview{HF,RHS,Approx,Res,Err} methods """ Output * to ParaView file. Args: mu: Target parameter. name(optional): Base name to be used for data output. filename(optional): Name of output file. time(optional): Timestamp. what(optional): Which plots to do. If list, can contain 'MESH', 'ABS', 'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard 'ALL'. Defaults to 'ALL'. forceNewFile(optional): Whether to create new output file. filePW(optional): Fenics File entity (for time series). """ for objName in ["HF", "RHS", "Approx", "Res", "Err"]: addOutParaviewFieldToClass(self, objName) ### add outParaviewTimeDomain{HF,RHS,Approx,Res,Err} methods """ Output * to ParaView file, converted to time domain. Args: mu: Target parameter. omega(optional): frequency. timeFinal(optional): final time of simulation. periodResolution(optional): number of time steps per period. name(optional): Base name to be used for data output. filename(optional): Name of output file. forceNewFile(optional): Whether to create new output file. """ for objName in ["HF", "RHS", "Approx", "Res", "Err"]: addOutParaviewTimeDomainFieldToClass(self, objName) def _preInit(self): if not hasattr(self, "depth"): self.depth = 0 else: self.depth += 1 @property def tModelType(self): raise RROMPyException("No trainedModel type assigned.") def initializeModelData(self, datadict): from rrompy.reduction_methods.trained_model import TrainedModelData return (TrainedModelData(datadict["mu0"], datadict.pop("projMat"), datadict["scaleFactor"], datadict.pop("rescalingExp")), ["mu0", "scaleFactor", "mus"]) @property def parameterList(self): """Value of parameterListSoft + parameterListCritical.""" return self.parameterListSoft + self.parameterListCritical def _addParametersToList(self, whatSoft:strLst, defaultSoft:ListAny, whatCritical : strLst = [], defaultCritical : ListAny = [], toBeExcluded : strLst = []): if not hasattr(self, "parameterToBeExcluded"): self.parameterToBeExcluded = [] self.parameterToBeExcluded = toBeExcluded + self.parameterToBeExcluded if not hasattr(self, "parameterListSoft"): self.parameterListSoft = [] if not hasattr(self, "parameterDefaultSoft"): self.parameterDefaultSoft = {} if not hasattr(self, "parameterListCritical"): self.parameterListCritical = [] if not hasattr(self, "parameterDefaultCritical"): self.parameterDefaultCritical = {} for j, what in enumerate(whatSoft): if what not in self.parameterToBeExcluded: self.parameterListSoft = [what] + self.parameterListSoft self.parameterDefaultSoft[what] = defaultSoft[j] for j, what in enumerate(whatCritical): if what not in self.parameterToBeExcluded: self.parameterListCritical = ([what] + self.parameterListCritical) self.parameterDefaultCritical[what] = defaultCritical[j] def _postInit(self): if self.depth == 0: vbMng(self, "DEL", "Done initializing.", 10) del self.depth else: self.depth -= 1 def name(self) -> str: return self.__class__.__name__ def __str__(self) -> str: return self.name() def __repr__(self) -> str: return self.__str__() + " at " + hex(id(self)) def setupSampling(self): """Setup sampling engine.""" RROMPyAssert(self._mode, message = "Cannot setup sampling engine.") if not hasattr(self, "_POD") or self._POD is None: return if self.POD: SamplingEngine = SamplingEngineStandardPOD else: SamplingEngine = SamplingEngineStandard self.samplingEngine = SamplingEngine(self.HFEngine, sample_state = self.approx_state, verbosity = self.verbosity) @property def HFEngine(self): """Value of HFEngine.""" return self._HFEngine @HFEngine.setter def HFEngine(self, HFEngine): raise RROMPyException("Cannot change HFEngine.") @property def mu0(self): """Value of mu0.""" return self._mu0 @mu0.setter def mu0(self, mu0): mu0 = checkParameter(mu0) if not hasattr(self, "_mu0") or mu0 != self.mu0: self.resetSamples() self._mu0 = mu0 @property def npar(self): """Number of parameters.""" return self.mu0.shape[1] @property def approxParameters(self): """Value of approximant parameters.""" return self._approxParameters @approxParameters.setter def approxParameters(self, approxParams): if not hasattr(self, "approxParameters"): self._approxParameters = {} approxParameters = purgeDict(approxParams, self.parameterList, dictname = self.name() + ".approxParameters", baselevel = 1) keyList = list(approxParameters.keys()) for key in self.parameterListCritical: if key in keyList: setattr(self, "_" + key, self.parameterDefaultCritical[key]) for key in self.parameterListSoft: if key in keyList: setattr(self, "_" + key, self.parameterDefaultSoft[key]) fragile = False for key in self.parameterListCritical: if key in keyList: val = approxParameters[key] else: val = getattr(self, "_" + key, None) if val is None: val = self.parameterDefaultCritical[key] getattr(self.__class__, key, None).fset(self, val) fragile = fragile or val is None for key in self.parameterListSoft: if key in keyList: val = approxParameters[key] else: val = getattr(self, "_" + key, None) if val is None: val = self.parameterDefaultSoft[key] getattr(self.__class__, key, None).fset(self, val) if fragile: self._mode = RROMPy_FRAGILE @property def POD(self): """Value of POD.""" return self._POD @POD.setter def POD(self, POD): if hasattr(self, "_POD"): PODold = self.POD else: PODold = -1 self._POD = POD self._approxParameters["POD"] = self.POD if PODold != self.POD: self.samplingEngine = None self.resetSamples() @property def approx_state(self): """Value of approx_state.""" return self._approx_state @approx_state.setter def approx_state(self, approx_state): if hasattr(self, "_approx_state"): approx_stateold = self.approx_state else: approx_stateold = -1 self._approx_state = approx_state if approx_stateold != self.approx_state: self.samplingEngine = None self.resetSamples() @property def S(self): """Value of S.""" return self._S @S.setter def S(self, S): if S <= 0: raise RROMPyException("S must be positive.") if hasattr(self, "_S") and self._S is not None: Sold = self.S else: Sold = -1 self._S = S self._approxParameters["S"] = self.S if Sold != self.S: self.resetSamples() @property def trainedModel(self): """Value of trainedModel.""" return self._trainedModel @trainedModel.setter def trainedModel(self, trainedModel): self._trainedModel = trainedModel if self._trainedModel is not None: - self._trainedModel.lastSolvedApproxReduced = emptyParameterList() - self._trainedModel.lastSolvedApprox = emptyParameterList() + self._trainedModel.reset() self.lastSolvedApproxReduced = emptyParameterList() self.lastSolvedApprox = emptyParameterList() self.uApproxReduced = emptySampleList() self.uApprox = emptySampleList() def resetSamples(self): if hasattr(self, "samplingEngine") and self.samplingEngine is not None: self.samplingEngine.resetHistory() else: self.setupSampling() self._mode = RROMPy_READY def plotSamples(self, warping : List[callable] = None, name : str = "u", save : str = None, what : strLst = 'all', saveFormat : str = "eps", saveDPI : int = 100, show : bool = True, plotArgs : dict = {}, **figspecs) -> List[str]: """ Do some nice plots of the samples. Args: warping(optional): Domain warping functions. name(optional): Name to be shown as title of the plots. Defaults to 'u'. what(optional): Which plots to do. If list, can contain 'ABS', 'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard 'ALL'. Defaults to 'ALL'. save(optional): Where to save plot(s). Defaults to None, i.e. no saving. saveFormat(optional): Format for saved plot(s). Defaults to "eps". saveDPI(optional): DPI for saved plot(s). Defaults to 100. show(optional): Whether to show figure. Defaults to True. plotArgs(optional): Optional arguments for fen/pyplot. figspecs(optional key args): Optional arguments for matplotlib figure creation. Returns: Output filenames. """ RROMPyAssert(self._mode, message = "Cannot plot samples.") return self.samplingEngine.plotSamples(warping, name, save, what, saveFormat, saveDPI, show, plotArgs, **figspecs) def outParaviewSamples(self, name : str = "u", filename : str = "out", times : Np1D = None, what : strLst = 'all', forceNewFile : bool = True, folders : bool = False, filePW = None) -> List[str]: """ Output samples to ParaView file. Args: name(optional): Base name to be used for data output. filename(optional): Name of output file. times(optional): Timestamps. what(optional): Which plots to do. If list, can contain 'MESH', 'ABS', 'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard 'ALL'. Defaults to 'ALL'. forceNewFile(optional): Whether to create new output file. folders(optional): Whether to split output in folders. filePW(optional): Fenics File entity (for time series). Returns: Output filenames. """ RROMPyAssert(self._mode, message = "Cannot output samples.") return self.samplingEngine.outParaviewSamples(name, folders, filename, times, what, forceNewFile, filePW) def outParaviewTimeDomainSamples(self, omegas : Np1D = None, timeFinal : Np1D = None, periodResolution : int = 20, name : str = "u", filename : str = "out", forceNewFile : bool = True, folders : bool = False) -> List[str]: """ Output samples to ParaView file, converted to time domain. Args: omegas(optional): frequencies. timeFinal(optional): final time of simulation. periodResolution(optional): number of time steps per period. name(optional): Base name to be used for data output. filename(optional): Name of output file. forceNewFile(optional): Whether to create new output file. folders(optional): Whether to split output in folders. Returns: Output filenames. """ RROMPyAssert(self._mode, message = "Cannot output samples.") return self.samplingEngine.outParaviewTimeDomainSamples( omegas, timeFinal, periodResolution, name, folders, filename, forceNewFile) def setSamples(self, samplingEngine): """Copy samplingEngine and samples.""" vbMng(self, "INIT", "Transfering samples.", 10) self.samplingEngine = copy(samplingEngine) vbMng(self, "DEL", "Done transfering samples.", 10) def setTrainedModel(self, model): """Deepcopy approximation from trained model.""" if hasattr(model, "storeTrainedModel"): verb = model.verbosity model.verbosity = 0 fileOut = model.storeTrainedModel() model.verbosity = verb else: try: fileOut = getNewFilename("trained_model", "pkl") pickleDump(model.data.__dict__, fileOut) except: raise RROMPyException(("Failed to store model data. Parameter " "model must have either " "storeTrainedModel or " "data.__dict__ properties.")) self.loadTrainedModel(fileOut) osrm(fileOut) @abstractmethod def setupApprox(self): """ Setup approximant. (ABSTRACT) Any specialization should include something like if self.checkComputedApprox(): return RROMPyAssert(self._mode, message = "Cannot setup approximant.") ... self.trainedModel = ... self.trainedModel.data = ... self.trainedModel.data.approxParameters = copy( self.approxParameters) """ pass def checkComputedApprox(self) -> bool: """ Check if setup of new approximant is not needed. Returns: True if new setup is not needed. False otherwise. """ return self._mode == RROMPy_FRAGILE or (self.trainedModel is not None and self.trainedModel.data.approxParameters == self.approxParameters) def _pruneBeforeEval(self, mu:paramList, field:str, append:bool, prune:bool) -> Tuple[paramList, Np1D, Np1D, bool]: mu = checkParameterList(mu, self.npar)[0] idx = np.empty(len(mu), dtype = np.int) if prune: jExtra = np.zeros(len(mu), dtype = bool) muExtra = emptyParameterList() lastSolvedMus = getattr(self, "lastSolved" + field) if (len(mu) > 0 and len(mu) == len(lastSolvedMus) and mu == lastSolvedMus): idx = np.arange(len(mu), dtype = np.int) return muExtra, jExtra, idx, True muKeep = copy(muExtra) for j in range(len(mu)): jPos = lastSolvedMus.find(mu[j]) if jPos is not None: idx[j] = jPos muKeep.append(mu[j]) else: jExtra[j] = True muExtra.append(mu[j]) if len(muKeep) > 0 and not append: lastSolvedu = getattr(self, "u" + field) idx[~jExtra] = getattr(self.__class__, "set" + field)(self, muKeep, lastSolvedu[idx[~jExtra]], append) append = True else: jExtra = np.ones(len(mu), dtype = bool) muExtra = mu return muExtra, jExtra, idx, append def _setObject(self, mu:paramList, field:str, object:sampList, append:bool) -> List[int]: newMus = checkParameterList(mu, self.npar)[0] newObj = sampleList(object) if append: getattr(self, "lastSolved" + field).append(newMus) getattr(self, "u" + field).append(newObj) Ltot = len(getattr(self, "u" + field)) return list(range(Ltot - len(newObj), Ltot)) setattr(self, "lastSolved" + field, copy(newMus)) setattr(self, "u" + field, copy(newObj)) return list(range(len(getattr(self, "u" + field)))) def setHF(self, muHF:paramList, uHF:sampleList, append : bool = False) -> List[int]: """Assign high fidelity solution.""" return self._setObject(muHF, "HF", uHF, append) def evalHF(self, mu:paramList, append : bool = False, prune : bool = True) -> List[int]: """ Find high fidelity solution with original parameters and arbitrary parameter. Args: mu: Target parameter. append(optional): Whether to append new HF solutions to old ones. prune(optional): Whether to remove duplicates of already appearing HF solutions. """ muExtra, jExtra, idx, append = self._pruneBeforeEval(mu, "HF", append, prune) if len(muExtra) > 0: vbMng(self, "INIT", "Solving HF model for mu = {}.".format(mu), 15) newuHFs = self.HFEngine.solve(muExtra) vbMng(self, "DEL", "Done solving HF model.", 15) idx[jExtra] = self.setHF(muExtra, newuHFs, append) return list(idx) def setApproxReduced(self, muApproxR:paramList, uApproxR:sampleList, append : bool = False) -> List[int]: """Assign high fidelity solution.""" return self._setObject(muApproxR, "ApproxReduced", uApproxR, append) def evalApproxReduced(self, mu:paramList, append : bool = False, prune : bool = True) -> List[int]: """ Evaluate reduced representation of approximant at arbitrary parameter. Args: mu: Target parameter. append(optional): Whether to append new HF solutions to old ones. prune(optional): Whether to remove duplicates of already appearing HF solutions. """ self.setupApprox() muExtra, jExtra, idx, append = self._pruneBeforeEval(mu, "ApproxReduced", append, prune) if len(muExtra) > 0: newuApproxs = self.trainedModel.getApproxReduced(muExtra) idx[jExtra] = self.setApproxReduced(muExtra, newuApproxs, append) return list(idx) def setApprox(self, muApprox:paramList, uApprox:sampleList, append : bool = False) -> List[int]: """Assign high fidelity solution.""" return self._setObject(muApprox, "Approx", uApprox, append) def evalApprox(self, mu:paramList, append : bool = False, prune : bool = True) -> List[int]: """ Evaluate approximant at arbitrary parameter. Args: mu: Target parameter. append(optional): Whether to append new HF solutions to old ones. prune(optional): Whether to remove duplicates of already appearing HF solutions. """ self.setupApprox() muExtra, jExtra, idx, append = self._pruneBeforeEval(mu, "Approx", append, prune) if len(muExtra) > 0: newuApproxs = self.trainedModel.getApprox(muExtra) idx[jExtra] = self.setApprox(muExtra, newuApproxs, append) return list(idx) def getHF(self, mu:paramList, append : bool = False, prune : bool = True) -> sampList: """ Get HF solution at arbitrary parameter. Args: mu: Target parameter. Returns: HFsolution. """ mu = checkParameterList(mu, self.npar)[0] idx = self.evalHF(mu, append = append, prune = prune) return self.uHF(idx) def getRHS(self, mu:paramList) -> sampList: """ Get linear system RHS at arbitrary parameter. Args: mu: Target parameter. Returns: Linear system RHS. """ return self.HFEngine.residual(mu, None) def getApproxReduced(self, mu:paramList, append : bool = False, prune : bool = True) -> sampList: """ Get approximant at arbitrary parameter. Args: mu: Target parameter. Returns: Reduced approximant. """ mu = checkParameterList(mu, self.npar)[0] idx = self.evalApproxReduced(mu, append = append, prune = prune) return self.uApproxReduced(idx) def getApprox(self, mu:paramList, append : bool = False, prune : bool = True) -> sampList: """ Get approximant at arbitrary parameter. Args: mu: Target parameter. Returns: Approximant. """ mu = checkParameterList(mu, self.npar)[0] idx = self.evalApprox(mu, append = append, prune = prune) return self.uApprox(idx) def getRes(self, mu:paramList) -> sampList: """ Get residual at arbitrary parameter. Args: mu: Target parameter. Returns: Approximant residual. """ if not (self.approx_state or self.HFEngine.isCEye): raise RROMPyException(("Residual of solution with non-scalar C " "not computable.")) return self.HFEngine.residual(mu, self.getApprox(mu) / self.HFEngine.C) def getErr(self, mu:paramList, append : bool = False, prune : bool = True) -> sampList: """ Get error at arbitrary parameter. Args: mu: Target parameter. Returns: Approximant error. """ return (self.getApprox(mu, append = append, prune =prune) - self.getHF(mu, append = append, prune = prune)) def normApprox(self, mu:paramList) -> float: """ Compute norm of approximant at arbitrary parameter. Args: mu: Target parameter. Returns: Target norm of approximant. """ if not (self.POD and self.HFEngine.isCEye): return self.HFEngine.norm(self.getApprox(mu), is_state = False) return np.linalg.norm(self.HFEngine.C * self.getApproxReduced(mu).data, axis = 0) def getPoles(self, *args, **kwargs) -> Np1D: """ Obtain approximant poles. Returns: Numpy complex vector of poles. """ self.setupApprox() vbMng(self, "INIT", "Computing poles of model.", 20) poles = self.trainedModel.getPoles(*args, **kwargs) vbMng(self, "DEL", "Done computing poles.", 20) return poles def storeTrainedModel(self, filenameBase : str = "trained_model", forceNewFile : bool = True) -> str: """Store trained reduced model to file.""" self.setupApprox() vbMng(self, "INIT", "Storing trained model to file.", 20) if forceNewFile: filename = getNewFilename(filenameBase, "pkl") else: filename = "{}.pkl".format(filenameBase) pickleDump(self.trainedModel.data.__dict__, filename) vbMng(self, "DEL", "Done storing trained model.", 20) return filename def loadTrainedModel(self, filename:str): """Load trained reduced model from file.""" vbMng(self, "INIT", "Loading pre-trained model from file.", 20) datadict = pickleLoad(filename) self.mu0 = datadict["mu0"] self.scaleFactor = datadict["scaleFactor"] self.mus = datadict["mus"] trainedModel = self.tModelType() trainedModel.verbosity = self.verbosity trainedModel.timestamp = self.timestamp data, selfkeys = self.initializeModelData(datadict) for key in selfkeys: setattr(self, key, datadict.pop(key)) approxParameters = datadict.pop("approxParameters") data.approxParameters = copy(approxParameters) for apkey in data.approxParameters.keys(): self._approxParameters[apkey] = approxParameters.pop(apkey) setattr(self, "_" + apkey, self._approxParameters[apkey]) for key in datadict: setattr(data, key, datadict[key]) trainedModel.data = data self.trainedModel = trainedModel self._mode = RROMPy_FRAGILE vbMng(self, "DEL", "Done loading pre-trained model.", 20) diff --git a/rrompy/reduction_methods/trained_model/trained_model.py b/rrompy/reduction_methods/trained_model/trained_model.py index 0c6a4bb..12108b3 100644 --- a/rrompy/reduction_methods/trained_model/trained_model.py +++ b/rrompy/reduction_methods/trained_model/trained_model.py @@ -1,96 +1,100 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from abc import abstractmethod from rrompy.utilities.base.types import Np1D, paramList, sampList from rrompy.parameter import checkParameterList from rrompy.sampling import sampleList, emptySampleList __all__ = ['TrainedModel'] class TrainedModel: """ ABSTRACT ROM approximant evaluation. Attributes: Data: dictionary with all that can be pickled. """ def name(self) -> str: return self.__class__.__name__ def __str__(self) -> str: return self.name() def __repr__(self) -> str: return self.__str__() + " at " + hex(id(self)) + def reset(self): + self.lastSolvedApproxReduced = None + self.lastSolvedApprox = None + @property def npar(self): """Number of parameters.""" return self.data.mu0.shape[1] @abstractmethod def getApproxReduced(self, mu : paramList = []) -> sampList: """ Evaluate reduced representation of approximant at arbitrary parameter. (ABSTRACT) Args: mu: Target parameter. """ pass def getApprox(self, mu : paramList = []) -> sampList: """ Evaluate approximant at arbitrary parameter. Args: mu: Target parameter. """ mu = checkParameterList(mu, self.data.npar)[0] if (not hasattr(self, "lastSolvedApprox") or self.lastSolvedApprox != mu): uApproxR = self.getApproxReduced(mu) self.uApprox = emptySampleList() for i in range(len(mu)): if isinstance(self.data.projMat, (list, sampleList,)): uApp = uApproxR[i][0] * self.data.projMat[0] for j in range(1, uApproxR.shape[0]): uApp += uApproxR[i][j] * self.data.projMat[j] else: uApp = self.data.projMat.dot(uApproxR[i]) if i == 0: #self.data.projMat.shape[0], len(mu) self.uApprox.reset((len(uApp), len(mu)), dtype = uApp.dtype) self.uApprox[i] = uApp self.lastSolvedApprox = mu return self.uApprox @abstractmethod def getPoles(self) -> Np1D: """ Obtain approximant poles. Returns: Numpy complex vector of poles. """ pass diff --git a/rrompy/reduction_methods/trained_model/trained_model_rational_mls.py b/rrompy/reduction_methods/trained_model/trained_model_rational_mls.py index 180b232..7d8cc36 100644 --- a/rrompy/reduction_methods/trained_model/trained_model_rational_mls.py +++ b/rrompy/reduction_methods/trained_model/trained_model_rational_mls.py @@ -1,175 +1,179 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import numpy as np from .trained_model_rational import TrainedModelRational from rrompy.utilities.base.types import Np1D, paramVal, paramList, sampList from rrompy.utilities.base import verbosityManager as vbMng from rrompy.utilities.poly_fitting.moving_least_squares import mlsweights from rrompy.utilities.poly_fitting.polynomial import ( PolynomialInterpolator as PI) from rrompy.utilities.numerical import customPInv, degreeTotalToFull from rrompy.parameter import checkParameterList from rrompy.sampling import emptySampleList __all__ = ['TrainedModelRationalMLS'] class TrainedModelRationalMLS(TrainedModelRational): """ ROM approximant evaluation for rational moving least squares approximant. Attributes: Data: dictionary with all that can be pickled. """ + def reset(self): + super().reset() + self.lastSetupMu = None + def assembleReducedModel(self, mu:paramVal): if not (hasattr(self.data, "lastSetupMu") and self.data.lastSetupMu == mu): vbMng(self, "INIT", "Assembling reduced model for mu = {}."\ .format(mu), 17) vbMng(self, "INIT", "Starting computation of denominator.", 35) muC = self.centerNormalize(mu) muSC = self.centerNormalize(self.data.mus) wQ = mlsweights(muC, muSC, self.data.radialBasisDen, directionalWeights = self.data.radialWeightsDen, nNearestNeighbor = self.data.nNearestNeighborDen) if self.data.N > self.data.M: PQVan = self.data.QVan else: PQVan = self.data.PVan VQAdjW = PQVan.conj().T * wQ VQAdjWVQ = VQAdjW.dot(PQVan) interpPseudoInverse, info = customPInv(VQAdjWVQ, full = True, rcond = self.data.interpRcond) interpPseudoInverse = interpPseudoInverse.dot(VQAdjW).dot( self.data.QBlocks) if info[0] < interpPseudoInverse.shape[-1]: q = np.zeros(interpPseudoInverse.shape[-1], dtype = np.complex) q[0] = 1. else: halfGram = interpPseudoInverse[self.data.domQIdxs] if self.data.POD: Rstack = halfGram.reshape(-1, halfGram.shape[-1]) vbMng(self, "INIT", "Solving svd for square root of gramian matrix.", 67) _, s, eV = np.linalg.svd(Rstack, full_matrices = False) condN = s[0] / s[-1] q = eV[-1, :].T.conj() vbMng(self, "MAIN", ("Solved svd problem of size {} x {} with condition " "number {:.4e}.").format(*Rstack.shape, condN), 55) vbMng(self, "DEL", "Done solving svd.", 67) else: RRstack = np.tensordot(self.trainedModel.gramian, halfGram, 1).reshape(-1, halfGram.shape[-1]) RLstack = halfGram.reshape(-1, halfGram.shape[-1]) gram = RLstack.T.conj().dot(RRstack) vbMng(self, "INIT", "Solving eigenvalue problem for gramian matrix.", 67) ev, eV = np.linalg.eigh(gram) condN = ev[-1] / ev[0] q = eV[:, 0] vbMng(self, "MAIN", ("Solved eigenvalue problem of size {} with " "condition number {:.4e}.").format(gram.shape[0], condN), 55) vbMng(self, "DEL", "Done solving eigenvalue problem.", 67) self.data.Q = PI() self.data.Q.npar = self.npar self.data.Q.polybasis = self.data.polybasis if self.data.polydegreetype == "TOTAL": self.data.Q.coeffs = degreeTotalToFull( (self.data.N + 1,) * self.npar, self.npar, q) else: self.data.Q.coeffs = q.reshape((self.data.N + 1,) * self.npar) vbMng(self, "DEL", "Done computing denominator.", 35) vbMng(self, "INIT", "Starting computation of numerator.", 35) self.data.P = PI() self.data.P.npar = self.npar self.data.P.polybasis = self.data.polybasis wP = mlsweights(muC, muSC, self.data.radialBasis, directionalWeights = self.data.radialWeights, nNearestNeighbor = self.data.nNearestNeighbor) VAdjW = self.data.PVan.conj().T * wP VAdjWV = VAdjW.dot(self.data.PVan) interpPPseudoInverse = customPInv(VAdjWV, self.data.interpRcond) Pcoeffs = np.tensordot(interpPPseudoInverse.dot(VAdjW), self.data.QBlocks.dot(q), ([1], [1])) if self.data.polydegreetype == "TOTAL": self.data.P.coeffs = degreeTotalToFull( (self.data.M + 1,) * self.npar + (self.data.QBlocks.shape[0],), self.npar, Pcoeffs) else: self.data.P.coeffs = Pcoeffs.reshape( (self.data.M + 1,) * self.npar + (self.data.QBlocks.shape[0],)) vbMng(self, "DEL", "Done computing numerator.", 35) vbMng(self, "DEL", "Done assembling reduced model.", 17) self.data.lastSetupMu = mu def getApproxReduced(self, mu : paramList = []) -> sampList: """ Evaluate reduced representation of approximant at arbitrary parameter. Args: mu: Target parameter. """ mu = checkParameterList(mu, self.data.npar)[0] if (not hasattr(self, "lastSolvedApproxReduced") or self.lastSolvedApproxReduced != mu): vbMng(self, "INIT", "Evaluating approximant at mu = {}.".format(mu), 12) self.uApproxReduced = emptySampleList() for i in range(len(mu)): self.assembleReducedModel(mu[i]) vbMng(self, "INIT", "Solving reduced model for mu = {}.".format(mu[i]), 15) uAppR = self.getPVal(mu[i]) / self.getQVal(mu[i]) if i == 0: #self.data.P.shape[-1], len(mu) self.uApproxReduced.reset((len(uAppR), len(mu)), dtype = uAppR.dtype) self.uApproxReduced[i] = uAppR vbMng(self, "DEL", "Done solving reduced model.", 15) vbMng(self, "DEL", "Done evaluating approximant.", 12) self.lastSolvedApproxReduced = mu return self.uApproxReduced def getPoles(self, *args, mu : paramVal = None, **kwargs) -> Np1D: """ Obtain approximant poles. Returns: Numpy complex vector of poles. """ if mu is None: mu = self.data.mu0 self.assembleReducedModel(mu) return super().getPoles(*args, **kwargs) def getResidues(self, *args, mu : paramVal = None, **kwargs) -> Np1D: """ Obtain approximant residues. Returns: Numpy matrix with residues as columns. """ if mu is None: mu = self.data.mu0 self.assembleReducedModel(mu) return super().getResidues(*args, **kwargs) diff --git a/rrompy/reduction_methods/trained_model/trained_model_reduced_basis.py b/rrompy/reduction_methods/trained_model/trained_model_reduced_basis.py index 975987e..9be201f 100644 --- a/rrompy/reduction_methods/trained_model/trained_model_reduced_basis.py +++ b/rrompy/reduction_methods/trained_model/trained_model_reduced_basis.py @@ -1,116 +1,120 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import numpy as np from .trained_model import TrainedModel from rrompy.utilities.base.types import (Np1D, ListAny, paramVal, paramList, sampList) from rrompy.utilities.base import verbosityManager as vbMng, freepar as fp from rrompy.utilities.numerical import (eigvalsNonlinearDense, marginalizePolyList) from rrompy.utilities.expression import expressionEvaluator from rrompy.utilities.exception_manager import RROMPyException, RROMPyWarning from rrompy.parameter import checkParameter, checkParameterList from rrompy.sampling import emptySampleList __all__ = ['TrainedModelReducedBasis'] class TrainedModelReducedBasis(TrainedModel): """ ROM approximant evaluation for RB approximant. Attributes: Data: dictionary with all that can be pickled. """ + def reset(self): + super().reset() + self.lastSetupMu = None + def assembleReducedModel(self, mu:paramVal): mu = checkParameter(mu, self.data.npar) if not (hasattr(self.data, "lastSetupMu") and self.data.lastSetupMu == mu): vbMng(self, "INIT", "Assembling reduced model for mu = {}."\ .format(mu), 17) muEff = mu ** self.data.rescalingExp self.data.ARBmu, self.data.bRBmu = 0., 0. for thA, ARB in zip(self.data.thAs, self.data.ARBs): self.data.ARBmu = (expressionEvaluator(thA[0], muEff) * ARB + self.data.ARBmu) for thb, bRB in zip(self.data.thbs, self.data.bRBs): self.data.bRBmu = (expressionEvaluator(thb[0], muEff) * bRB + self.data.bRBmu) vbMng(self, "DEL", "Done assembling reduced model.", 17) self.data.lastSetupMu = mu def getApproxReduced(self, mu : paramList = []) -> sampList: """ Evaluate reduced representation of approximant at arbitrary parameter. Args: mu: Target parameter. """ mu = checkParameterList(mu, self.data.npar)[0] if (not hasattr(self, "lastSolvedApproxReduced") or self.lastSolvedApproxReduced != mu): vbMng(self, "INIT", "Computing RB solution at mu = {}.".format(mu), 12) self.uApproxReduced = emptySampleList() for i in range(len(mu)): self.assembleReducedModel(mu[i]) vbMng(self, "INIT", "Solving reduced model for mu = {}.".format(mu[i]), 15) uAppR = np.linalg.solve(self.data.ARBmu, self.data.bRBmu) if i == 0: #self.data.ARBs[0].shape[-1], len(mu) self.uApproxReduced.reset((len(uAppR), len(mu)), dtype = uAppR.dtype) self.uApproxReduced[i] = uAppR vbMng(self, "DEL", "Done solving reduced model.", 15) vbMng(self, "DEL", "Done computing RB solution.", 12) self.lastSolvedApproxReduced = mu return self.uApproxReduced def getPoles(self, marginalVals : ListAny = [fp], jSupp : int = 1, **kwargs) -> Np1D: """ Obtain approximant poles. Returns: Numpy complex vector of poles. """ if not self.data.affinePoly: RROMPyWarning(("Unable to compute approximate poles due " "to parametric dependence (detected non-" "polynomial). Change HFEngine.affinePoly to True " "if necessary.")) return if not hasattr(marginalVals, "__len__"): marginalVals = [marginalVals] mVals = list(marginalVals) try: rDim = mVals.index(fp) if rDim < len(mVals) - 1 and fp in mVals[rDim + 1 :]: raise except: raise RROMPyException(("Exactly 1 'freepar' entry in " "marginalVals must be provided.")) ARBs = self.data.ARBs if self.data.npar > 1: mVals[rDim] = self.data.mu0(rDim) mVals = checkParameter(mVals).data.flatten() mVals[rDim] = fp ARBs = marginalizePolyList(ARBs, mVals, "auto") ev = eigvalsNonlinearDense(ARBs, jSupp = jSupp, **kwargs) return np.sort(np.power(ev, 1. / self.data.rescalingExp[rDim]))