diff --git a/rrompy/reduction_methods/base/generic_approximant.py b/rrompy/reduction_methods/base/generic_approximant.py
index 294f1ed..12718ee 100644
--- a/rrompy/reduction_methods/base/generic_approximant.py
+++ b/rrompy/reduction_methods/base/generic_approximant.py
@@ -1,868 +1,886 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
from abc import abstractmethod
import numpy as np
from itertools import product as iterprod
from copy import deepcopy as copy
from os import remove as osrm
from rrompy.sampling.standard import (SamplingEngineStandard,
SamplingEngineStandardPOD)
from rrompy.utilities.base.types import (Np1D, DictAny, HFEng, List, Tuple,
ListAny, strLst, paramVal, paramList,
sampList)
from rrompy.utilities.base import (purgeDict, verbosityManager as vbMng,
getNewFilename)
from rrompy.utilities.exception_manager import (RROMPyException, RROMPyAssert,
RROMPy_READY, RROMPy_FRAGILE)
from rrompy.utilities.base import pickleDump, pickleLoad
from rrompy.parameter import (emptyParameterList, checkParameter,
checkParameterList)
from rrompy.sampling import sampleList, emptySampleList
__all__ = ['GenericApproximant']
def addNormFieldToClass(self, fieldName):
- def objFunc(self, mu:paramList, homogeneized : bool = False) -> Np1D:
+ def objFunc(self, mu:paramList, *args, homogeneized : bool = False,
+ **kwargs) -> Np1D:
uV = getattr(self.__class__, "get" + fieldName)(self, mu, homogeneized)
- val = self.HFEngine.norm(uV)
+ val = self.HFEngine.norm(uV, *args, **kwargs)
+ return val
+ setattr(self.__class__, "norm" + fieldName, objFunc)
+
+def addNormDualFieldToClass(self, fieldName):
+ def objFunc(self, mu:paramList, *args, homogeneized : bool = False,
+ duality : bool = True, **kwargs) -> Np1D:
+ uV = getattr(self.__class__, "get" + fieldName)(self, mu, homogeneized,
+ duality)
+ val = self.HFEngine.norm(uV, *args, **kwargs)
return val
setattr(self.__class__, "norm" + fieldName, objFunc)
def addPlotFieldToClass(self, fieldName):
def objFunc(self, mu:paramList, *args, homogeneized : bool = False,
**kwargs):
uV = getattr(self.__class__, "get" + fieldName)(self, mu, homogeneized)
kwargsCopy = copy(kwargs)
for j, u in enumerate(uV):
if "name" in kwargs.keys():
kwargsCopy["name"] = kwargs["name"] + str(j)
self.HFEngine.plot(u, *args, **kwargs)
setattr(self.__class__, "plot" + fieldName, objFunc)
def addOutParaviewFieldToClass(self, fieldName):
def objFunc(self, mu:paramVal, *args, homogeneized : bool = False,
**kwargs):
if not hasattr(self.HFEngine, "outParaview"):
raise RROMPyException(("High fidelity engine cannot output to "
"Paraview."))
uV = getattr(self.__class__, "get" + fieldName)(self, mu, homogeneized)
kwargsCopy = copy(kwargs)
for j, u in enumerate(uV):
if "name" in kwargs.keys():
kwargsCopy["name"] = kwargs["name"] + str(j)
self.HFEngine.outParaview(u, *args, **kwargsCopy)
setattr(self.__class__, "outParaview" + fieldName, objFunc)
def addOutParaviewTimeDomainFieldToClass(self, fieldName):
def objFunc(self, mu:paramVal, *args,
homogeneized : bool = False, **kwargs):
if not hasattr(self.HFEngine, "outParaviewTimeDomain"):
raise RROMPyException(("High fidelity engine cannot output to "
"Paraview."))
uV = getattr(self.__class__, "get" + fieldName)(self, mu, homogeneized)
omega = args.pop(0) if len(args) > 0 else np.real(mu)
kwargsCopy = copy(kwargs)
for j, u in enumerate(uV):
if "name" in kwargs.keys():
kwargsCopy["name"] = kwargs["name"] + str(j)
self.HFEngine.outParaviewTimeDomain(u, omega, *args,
**kwargsCopy)
setattr(self.__class__, "outParaviewTimeDomain" + fieldName, objFunc)
class GenericApproximant:
"""
ABSTRACT
ROM approximant computation for parametric problems.
Args:
HFEngine: HF problem solver.
mu0(optional): Default parameter. Defaults to 0.
approxParameters(optional): Dictionary containing values for main
parameters of approximant. Recognized keys are:
- 'POD': whether to compute POD of snapshots; defaults to True;
- 'S': total number of samples current approximant relies upon.
Defaults to empty dict.
homogeneized(optional): Whether to homogeneize Dirichlet BCs. Defaults
to False.
verbosity(optional): Verbosity level. Defaults to 10.
Attributes:
HFEngine: HF problem solver.
trainedModel: Trained model evaluator.
mu0: Default parameter.
homogeneized: Whether to homogeneize Dirichlet BCs.
approxParameters: Dictionary containing values for main parameters of
approximant. Recognized keys are in parameterList{Soft,Critical}.
parameterListSoft: Recognized keys of soft approximant parameters:
- 'POD': whether to compute POD of snapshots.
parameterListCritical: Recognized keys of critical approximant
parameters:
- 'S': total number of samples current approximant relies upon.
verbosity: Verbosity level.
POD: Whether to compute POD of snapshots.
S: Number of solution snapshots over which current approximant is
based upon.
samplingEngine: Sampling engine.
uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as
sampleList.
lastSolvedHF: Parameter(s) corresponding to last computed high fidelity
solution(s) as parameterList.
uApproxReduced: Reduced approximate solution(s) with parameter(s)
lastSolvedApprox as sampleList.
lastSolvedApproxReduced: Parameter(s) corresponding to last computed
reduced approximate solution(s) as parameterList.
uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as
sampleList.
lastSolvedApprox: Parameter(s) corresponding to last computed
approximate solution(s) as parameterList.
"""
__all__ += [ftype + dtype for ftype, dtype in iterprod(
["norm", "plot", "outParaview", "outParaviewTimeDomain"],
["HF", "RHS", "Approx", "Res", "Err"])]
def __init__(self, HFEngine:HFEng, mu0 : paramVal = None,
approxParameters : DictAny = {}, homogeneized : bool = False,
verbosity : int = 10, timestamp : bool = True):
self._preInit()
self._mode = RROMPy_READY
self.verbosity = verbosity
self.timestamp = timestamp
vbMng(self, "INIT",
"Initializing engine of type {}.".format(self.name()), 10)
self._HFEngine = HFEngine
self.trainedModel = None
self.lastSolvedHF = emptyParameterList()
self.uHF = emptySampleList()
self._addParametersToList(["POD"], [True], ["S"], [[1]])
if mu0 is None:
if hasattr(self.HFEngine, "mu0"):
self.mu0 = checkParameter(self.HFEngine.mu0)
else:
raise RROMPyException(("Center of approximation cannot be "
"inferred from HF engine. Parameter "
"required"))
else:
self.mu0 = checkParameter(mu0, self.HFEngine.npar)
self.resetSamples()
self.homogeneized = homogeneized
self.approxParameters = approxParameters
self._postInit()
- ### add norm{HF,RHS,Approx,Res,Err} methods
+ ### add norm{HF,Approx,Err} methods
"""
Compute norm of * at arbitrary parameter.
Args:
mu: Target parameter.
homogeneized(optional): Whether to remove Dirichlet BC. Defaults to
False.
Returns:
Target norm of *.
"""
- for objName in ["HF", "RHS", "Err"]:
+ for objName in ["HF", "Err"]:
addNormFieldToClass(self, objName)
- def objFunc(self, mu:paramList, homogeneized : bool = False) -> Np1D:
-# uV = getattr(self.__class__, "getRes")(self, mu, homogeneized,
-# duality = False)
- uV = self.getRes(mu, homogeneized, duality = False)
- val = self.HFEngine.norm(uV, dual = True, duality = False)
- return val
- setattr(self.__class__, "normRes", objFunc)
if not hasattr(self, "normApprox"):
addNormFieldToClass(self, "Approx")
+ ### add norm{RHS,Res} methods
+ """
+ Compute norm of * at arbitrary parameter.
+ Args:
+ mu: Target parameter.
+ homogeneized(optional): Whether to remove Dirichlet BC. Defaults to
+ False.
+ duality(optional): Whether to compute duality of object.
+ Returns:
+ Target norm of *.
+ """
+ for objName in ["RHS", "Res"]:
+ addNormDualFieldToClass(self, objName)
+
### add plot{HF,RHS,Approx,Res,Err} methods
"""
Do some nice plots of * at arbitrary parameter.
Args:
mu: Target parameter.
name(optional): Name to be shown as title of the plots. Defaults to
'u'.
what(optional): Which plots to do. If list, can contain 'ABS',
'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard 'ALL'.
Defaults to 'ALL'.
save(optional): Where to save plot(s). Defaults to None, i.e. no
saving.
saveFormat(optional): Format for saved plot(s). Defaults to "eps".
saveDPI(optional): DPI for saved plot(s). Defaults to 100.
show(optional): Whether to show figure. Defaults to True.
homogeneized(optional): Whether to remove Dirichlet BC. Defaults to
False.
figspecs(optional key args): Optional arguments for matplotlib
figure creation.
"""
for objName in ["HF", "RHS", "Approx", "Res", "Err"]:
addPlotFieldToClass(self, objName)
### add outParaview{HF,RHS,Approx,Res,Err} methods
"""
Output * to ParaView file.
Args:
mu: Target parameter.
name(optional): Base name to be used for data output.
filename(optional): Name of output file.
time(optional): Timestamp.
what(optional): Which plots to do. If list, can contain 'MESH',
'ABS', 'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard
'ALL'. Defaults to 'ALL'.
forceNewFile(optional): Whether to create new output file.
filePW(optional): Fenics File entity (for time series).
homogeneized(optional): Whether to remove Dirichlet BC. Defaults to
False.
"""
for objName in ["HF", "RHS", "Approx", "Res", "Err"]:
addOutParaviewFieldToClass(self, objName)
### add outParaviewTimeDomain{HF,RHS,Approx,Res,Err} methods
"""
Output * to ParaView file, converted to time domain.
Args:
mu: Target parameter.
omega(optional): frequency.
timeFinal(optional): final time of simulation.
periodResolution(optional): number of time steps per period.
name(optional): Base name to be used for data output.
filename(optional): Name of output file.
forceNewFile(optional): Whether to create new output file.
homogeneized(optional): Whether to remove Dirichlet BC. Defaults to
False.
"""
for objName in ["HF", "RHS", "Approx", "Res", "Err"]:
addOutParaviewTimeDomainFieldToClass(self, objName)
def _preInit(self):
if not hasattr(self, "depth"): self.depth = 0
else: self.depth += 1
@property
def parameterList(self):
"""Value of parameterListSoft + parameterListCritical."""
return self.parameterListSoft + self.parameterListCritical
def _addParametersToList(self, whatSoft:strLst, defaultSoft:ListAny,
whatCritical : strLst = [],
defaultCritical : ListAny = [],
toBeExcluded : strLst = []):
if not hasattr(self, "parameterToBeExcluded"):
self.parameterToBeExcluded = []
- self.parameterToBeExcluded += toBeExcluded
+ self.parameterToBeExcluded = toBeExcluded + self.parameterToBeExcluded
if not hasattr(self, "parameterListSoft"):
self.parameterListSoft = []
if not hasattr(self, "parameterDefaultSoft"):
self.parameterDefaultSoft = {}
if not hasattr(self, "parameterListCritical"):
self.parameterListCritical = []
if not hasattr(self, "parameterDefaultCritical"):
self.parameterDefaultCritical = {}
for j, what in enumerate(whatSoft):
if what not in self.parameterToBeExcluded:
- self.parameterListSoft += [what]
+ self.parameterListSoft = [what] + self.parameterListSoft
self.parameterDefaultSoft[what] = defaultSoft[j]
for j, what in enumerate(whatCritical):
if what not in self.parameterToBeExcluded:
- self.parameterListCritical += [what]
+ self.parameterListCritical = ([what]
+ + self.parameterListCritical)
self.parameterDefaultCritical[what] = defaultCritical[j]
def _postInit(self):
if self.depth == 0:
vbMng(self, "DEL", "Done initializing.", 10)
del self.depth
else: self.depth -= 1
def name(self) -> str:
return self.__class__.__name__
def __str__(self) -> str:
return self.name()
def __repr__(self) -> str:
return self.__str__() + " at " + hex(id(self))
def setupSampling(self):
"""Setup sampling engine."""
RROMPyAssert(self._mode, message = "Cannot setup sampling engine.")
if not hasattr(self, "_POD") or self._POD is None: return
if self.POD:
SamplingEngine = SamplingEngineStandardPOD
else:
SamplingEngine = SamplingEngineStandard
self.samplingEngine = SamplingEngine(self.HFEngine,
verbosity = self.verbosity,
allowRepeatedSamples = True)
@property
def HFEngine(self):
"""Value of HFEngine."""
return self._HFEngine
@HFEngine.setter
def HFEngine(self, HFEngine):
raise RROMPyException("Cannot change HFEngine.")
@property
def mu0(self):
"""Value of mu0."""
return self._mu0
@mu0.setter
def mu0(self, mu0):
mu0 = checkParameter(mu0)
if not hasattr(self, "_mu0") or mu0 != self.mu0:
self.resetSamples()
self._mu0 = mu0
@property
def npar(self):
"""Number of parameters."""
return self.mu0.shape[1]
@property
def approxParameters(self):
"""Value of approximant parameters."""
return self._approxParameters
@approxParameters.setter
def approxParameters(self, approxParams):
if not hasattr(self, "approxParameters"):
self._approxParameters = {}
approxParameters = purgeDict(approxParams, self.parameterList,
dictname = self.name() + ".approxParameters",
baselevel = 1)
keyList = list(approxParameters.keys())
for key in self.parameterListCritical:
if key in keyList:
setattr(self, "_" + key, self.parameterDefaultCritical[key])
for key in self.parameterListSoft:
if key in keyList:
setattr(self, "_" + key, self.parameterDefaultSoft[key])
fragile = False
for key in self.parameterListCritical:
if key in keyList:
val = approxParameters[key]
else:
val = getattr(self, "_" + key, None)
if val is None:
val = self.parameterDefaultCritical[key]
getattr(self.__class__, key, None).fset(self, val)
fragile = fragile or val is None
for key in self.parameterListSoft:
if key in keyList:
val = approxParameters[key]
else:
val = getattr(self, "_" + key, None)
if val is None:
val = self.parameterDefaultSoft[key]
getattr(self.__class__, key, None).fset(self, val)
if fragile:
self._mode = RROMPy_FRAGILE
@property
def POD(self):
"""Value of POD."""
return self._POD
@POD.setter
def POD(self, POD):
if hasattr(self, "_POD"): PODold = self.POD
else: PODold = -1
self._POD = POD
self._approxParameters["POD"] = self.POD
if PODold != self.POD:
self.samplingEngine = None
self.resetSamples()
@property
def S(self):
"""Value of S."""
return self._S
@S.setter
def S(self, S):
if not hasattr(S, "__len__"): S = [S]
if any([s <= 0 for s in S]):
raise RROMPyException("S must be positive.")
if hasattr(self, "_S") and self._S is not None: Sold = tuple(self.S)
else: Sold = -1
self._S = S
self._approxParameters["S"] = self.S
if Sold != tuple(self.S):
self.resetSamples()
@property
def homogeneized(self):
"""Value of homogeneized."""
return self._homogeneized
@homogeneized.setter
def homogeneized(self, homogeneized):
if not hasattr(self, "_homogeneized"):
self._homogeneized = None
if homogeneized != self.homogeneized:
self._homogeneized = homogeneized
self.resetSamples()
@property
def trainedModel(self):
"""Value of trainedModel."""
return self._trainedModel
@trainedModel.setter
def trainedModel(self, trainedModel):
self._trainedModel = trainedModel
if self._trainedModel is not None:
self._trainedModel.lastSolvedApproxReduced = emptyParameterList()
self._trainedModel.lastSolvedApprox = emptyParameterList()
self.lastSolvedApproxReduced = emptyParameterList()
self.lastSolvedApprox = emptyParameterList()
self.uApproxReduced = emptySampleList()
self.uApprox = emptySampleList()
def resetSamples(self):
if hasattr(self, "samplingEngine") and self.samplingEngine is not None:
self.samplingEngine.resetHistory()
else:
self.setupSampling()
self._mode = RROMPy_READY
def plotSamples(self, warping : List[callable] = None, name : str = "u",
save : str = None, what : strLst = 'all',
saveFormat : str = "eps", saveDPI : int = 100,
show : bool = True, plotArgs : dict = {}, **figspecs):
"""
Do some nice plots of the samples.
Args:
warping(optional): Domain warping functions.
name(optional): Name to be shown as title of the plots. Defaults to
'u'.
what(optional): Which plots to do. If list, can contain 'ABS',
'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard 'ALL'.
Defaults to 'ALL'.
save(optional): Where to save plot(s). Defaults to None, i.e. no
saving.
saveFormat(optional): Format for saved plot(s). Defaults to "eps".
saveDPI(optional): DPI for saved plot(s). Defaults to 100.
show(optional): Whether to show figure. Defaults to True.
plotArgs(optional): Optional arguments for fen/pyplot.
figspecs(optional key args): Optional arguments for matplotlib
figure creation.
"""
RROMPyAssert(self._mode, message = "Cannot plot samples.")
self.samplingEngine.plotSamples(warping, name, save, what, saveFormat,
saveDPI, show, plotArgs, **figspecs)
def outParaviewSamples(self, name : str = "u", filename : str = "out",
times : Np1D = None, what : strLst = 'all',
forceNewFile : bool = True, folders : bool = False,
filePW = None):
"""
Output samples to ParaView file.
Args:
name(optional): Base name to be used for data output.
filename(optional): Name of output file.
times(optional): Timestamps.
what(optional): Which plots to do. If list, can contain 'MESH',
'ABS', 'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard
'ALL'. Defaults to 'ALL'.
forceNewFile(optional): Whether to create new output file.
folders(optional): Whether to split output in folders.
filePW(optional): Fenics File entity (for time series).
"""
RROMPyAssert(self._mode, message = "Cannot output samples.")
self.samplingEngine.outParaviewSamples(name = name,
filename = filename,
times = times, what = what,
forceNewFile = forceNewFile,
folders = folders,
filePW = filePW)
def outParaviewTimeDomainSamples(self, omegas : Np1D = None,
timeFinal : Np1D = None,
periodResolution : int = 20,
name : str = "u",
filename : str = "out",
forceNewFile : bool = True,
folders : bool = False):
"""
Output samples to ParaView file, converted to time domain.
Args:
omegas(optional): frequencies.
timeFinal(optional): final time of simulation.
periodResolution(optional): number of time steps per period.
name(optional): Base name to be used for data output.
filename(optional): Name of output file.
forceNewFile(optional): Whether to create new output file.
folders(optional): Whether to split output in folders.
"""
RROMPyAssert(self._mode, message = "Cannot output samples.")
self.samplingEngine.outParaviewTimeDomainSamples(omegas = omegas,
timeFinal = timeFinal,
periodResolution = periodResolution,
name = name, filename = filename,
forceNewFile = forceNewFile,
folders = folders)
def setSamples(self, samplingEngine):
"""Copy samplingEngine and samples."""
vbMng(self, "INIT", "Transfering samples.", 10)
self.samplingEngine = copy(samplingEngine)
vbMng(self, "DEL", "Done transfering samples.", 10)
def setTrainedModel(self, model):
"""Deepcopy approximation from trained model."""
if hasattr(model, "storeTrainedModel"):
verb = model.verbosity
model.verbosity = 0
fileOut = model.storeTrainedModel()
model.verbosity = verb
else:
try:
fileOut = getNewFilename("trained_model", "pkl")
pickleDump(model.data.__dict__, fileOut)
except:
raise RROMPyException(("Failed to store model data. Parameter "
"model must have either "
"storeTrainedModel or "
"data.__dict__ properties."))
self.loadTrainedModel(fileOut)
osrm(fileOut)
@abstractmethod
def setupApprox(self):
"""
Setup approximant. (ABSTRACT)
Any specialization should include something like
if self.checkComputedApprox():
return
RROMPyAssert(self._mode, message = "Cannot setup approximant.")
...
self.trainedModel = ...
self.trainedModel.data = ...
self.trainedModel.data.approxParameters = copy(
self.approxParameters)
"""
pass
def checkComputedApprox(self) -> bool:
"""
Check if setup of new approximant is not needed.
Returns:
True if new setup is not needed. False otherwise.
"""
return self._mode == RROMPy_FRAGILE or (self.trainedModel is not None
and self.trainedModel.data.approxParameters == self.approxParameters)
def _pruneBeforeEval(self, mu:paramList, field:str, append:bool,
prune:bool) -> Tuple[paramList, Np1D, Np1D, bool]:
mu = checkParameterList(mu, self.npar)[0]
idx = np.empty(len(mu), dtype = np.int)
if prune:
jExtra = np.zeros(len(mu), dtype = bool)
muExtra = emptyParameterList()
lastSolvedMus = getattr(self, "lastSolved" + field)
if (len(mu) > 0 and len(mu) == len(lastSolvedMus)
and mu == lastSolvedMus):
idx = np.arange(len(mu), dtype = np.int)
return muExtra, jExtra, idx, True
muKeep = copy(muExtra)
for j in range(len(mu)):
jPos = lastSolvedMus.find(mu[j])
if jPos is not None:
idx[j] = jPos
muKeep.append(mu[j])
else:
jExtra[j] = True
muExtra.append(mu[j])
if len(muKeep) > 0 and not append:
lastSolvedu = getattr(self, "u" + field)
idx[~jExtra] = getattr(self.__class__, "set" + field)(self,
muKeep, lastSolvedu[idx[~jExtra]], append)
append = True
else:
jExtra = np.ones(len(mu), dtype = bool)
muExtra = mu
return muExtra, jExtra, idx, append
def _setObject(self, mu:paramList, field:str, object:sampList,
append:bool) -> List[int]:
newMus = checkParameterList(mu, self.npar)[0]
newObj = sampleList(object)
if append:
getattr(self, "lastSolved" + field).append(newMus)
getattr(self, "u" + field).append(newObj)
Ltot = len(getattr(self, "u" + field))
return list(range(Ltot - len(newObj), Ltot))
setattr(self, "lastSolved" + field, copy(newMus))
setattr(self, "u" + field, copy(newObj))
return list(range(len(getattr(self, "u" + field))))
def setHF(self, muHF:paramList, uHF:sampleList,
append : bool = False) -> List[int]:
"""Assign high fidelity solution."""
return self._setObject(muHF, "HF", uHF, append)
def evalHF(self, mu:paramList, append : bool = False,
prune : bool = True) -> List[int]:
"""
Find high fidelity solution with original parameters and arbitrary
parameter.
Args:
mu: Target parameter.
append(optional): Whether to append new HF solutions to old ones.
prune(optional): Whether to remove duplicates of already appearing
HF solutions.
"""
muExtra, jExtra, idx, append = self._pruneBeforeEval(mu, "HF", append,
prune)
if len(muExtra) > 0:
vbMng(self, "INIT", "Solving HF model for mu = {}.".format(mu),
15)
newuHFs = self.HFEngine.solve(muExtra,
homogeneized = self.homogeneized)
vbMng(self, "DEL", "Done solving HF model.", 15)
idx[jExtra] = self.setHF(muExtra, newuHFs, append)
return list(idx)
def setApproxReduced(self, muApproxR:paramList, uApproxR:sampleList,
append : bool = False) -> List[int]:
"""Assign high fidelity solution."""
return self._setObject(muApproxR, "ApproxReduced", uApproxR, append)
def evalApproxReduced(self, mu:paramList, append : bool = False,
prune : bool = True) -> List[int]:
"""
Evaluate reduced representation of approximant at arbitrary parameter.
Args:
mu: Target parameter.
append(optional): Whether to append new HF solutions to old ones.
prune(optional): Whether to remove duplicates of already appearing
HF solutions.
"""
self.setupApprox()
muExtra, jExtra, idx, append = self._pruneBeforeEval(mu,
"ApproxReduced",
append, prune)
if len(muExtra) > 0:
newuApproxs = self.trainedModel.getApproxReduced(muExtra)
idx[jExtra] = self.setApproxReduced(muExtra, newuApproxs, append)
return list(idx)
def setApprox(self, muApprox:paramList, uApprox:sampleList,
append : bool = False) -> List[int]:
"""Assign high fidelity solution."""
return self._setObject(muApprox, "Approx", uApprox, append)
def evalApprox(self, mu:paramList, append : bool = False,
prune : bool = True) -> List[int]:
"""
Evaluate approximant at arbitrary parameter.
Args:
mu: Target parameter.
append(optional): Whether to append new HF solutions to old ones.
prune(optional): Whether to remove duplicates of already appearing
HF solutions.
"""
self.setupApprox()
muExtra, jExtra, idx, append = self._pruneBeforeEval(mu, "Approx",
append, prune)
if len(muExtra) > 0:
newuApproxs = self.trainedModel.getApprox(muExtra)
idx[jExtra] = self.setApprox(muExtra, newuApproxs, append)
return list(idx)
def getHF(self, mu:paramList, homogeneized : bool = False,
append : bool = False, prune : bool = True) -> sampList:
"""
Get HF solution at arbitrary parameter.
Args:
mu: Target parameter.
homogeneized(optional): Whether to remove Dirichlet BC. Defaults to
False.
Returns:
HFsolution.
"""
mu = checkParameterList(mu, self.npar)[0]
idx = self.evalHF(mu, append = append, prune = prune)
uHFs = self.uHF(idx)
if self.homogeneized and not homogeneized:
for j, m in enumerate(mu):
uHFs[j] += self.HFEngine.liftDirichletData(m)
if not self.homogeneized and homogeneized:
for j, m in enumerate(mu):
uHFs[j] -= self.HFEngine.liftDirichletData(m)
return uHFs
def getRHS(self, mu:paramList, homogeneized : bool = False,
duality : bool = True) -> sampList:
"""
Get linear system RHS at arbitrary parameter.
Args:
mu: Target parameter.
homogeneized(optional): Whether to remove Dirichlet BC. Defaults to
False.
Returns:
Linear system RHS.
"""
return self.HFEngine.residual(None, mu, homogeneized = homogeneized,
duality = duality)
def getApproxReduced(self, mu:paramList, append : bool = False,
prune : bool = True) -> sampList:
"""
Get approximant at arbitrary parameter.
Args:
mu: Target parameter.
Returns:
Reduced approximant.
"""
mu = checkParameterList(mu, self.npar)[0]
idx = self.evalApproxReduced(mu, append = append, prune = prune)
uApproxRs = self.uApproxReduced(idx)
return uApproxRs
def getApprox(self, mu:paramList, homogeneized : bool = False,
append : bool = False, prune : bool = True) -> sampList:
"""
Get approximant at arbitrary parameter.
Args:
mu: Target parameter.
homogeneized(optional): Whether to remove Dirichlet BC. Defaults to
False.
Returns:
Approximant.
"""
mu = checkParameterList(mu, self.npar)[0]
idx = self.evalApprox(mu, append = append, prune = prune)
uApproxs = self.uApprox(idx)
if self.homogeneized and not homogeneized:
for j, m in enumerate(mu):
uApproxs[j] += self.HFEngine.liftDirichletData(m)
if not self.homogeneized and homogeneized:
for j, m in enumerate(mu):
uApproxs[j] -= self.HFEngine.liftDirichletData(m)
return uApproxs
def getRes(self, mu:paramList, homogeneized : bool = False,
duality : bool = True) -> sampList:
"""
Get residual at arbitrary parameter.
Args:
mu: Target parameter.
homogeneized(optional): Whether to remove Dirichlet BC. Defaults to
False.
Returns:
Approximant residual.
"""
return self.HFEngine.residual(self.getApprox(mu, homogeneized), mu,
homogeneized = homogeneized,
duality = duality)
def getErr(self, mu:paramList, homogeneized : bool = False,
append : bool = False, prune : bool = True) -> sampList:
"""
Get error at arbitrary parameter.
Args:
mu: Target parameter.
homogeneized(optional): Whether to remove Dirichlet BC. Defaults to
False.
Returns:
Approximant error.
"""
return (self.getApprox(mu, homogeneized, append = append, prune =prune)
- self.getHF(mu, homogeneized, append = append, prune = prune))
def getPoles(self, *args, **kwargs) -> Np1D:
"""
Obtain approximant poles.
Returns:
Numpy complex vector of poles.
"""
self.setupApprox()
vbMng(self, "INIT", "Computing poles of model.", 20)
poles = self.trainedModel.getPoles(*args, **kwargs)
vbMng(self, "DEL", "Done computing poles.", 20)
return poles
def storeTrainedModel(self, filenameBase : str = "trained_model",
forceNewFile : bool = True) -> str:
"""Store trained reduced model to file."""
self.setupApprox()
vbMng(self, "INIT", "Storing trained model to file.", 20)
if forceNewFile:
filename = getNewFilename(filenameBase, "pkl")
else:
filename = "{}.pkl".format(filenameBase)
pickleDump(self.trainedModel.data.__dict__, filename)
vbMng(self, "DEL", "Done storing trained model.", 20)
return filename
def loadTrainedModel(self, filename:str):
"""Load trained reduced model from file."""
vbMng(self, "INIT", "Loading pre-trained model from file.", 20)
datadict = pickleLoad(filename)
name = datadict.pop("name")
if name == "TrainedModelRational":
from rrompy.reduction_methods.trained_model import \
TrainedModelRational as tModel
elif name == "TrainedModelReducedBasis":
from rrompy.reduction_methods.trained_model import \
TrainedModelReducedBasis as tModel
else:
raise RROMPyException(("Trained model name not recognized. "
"Loading failed."))
self.mu0 = datadict.pop("mu0")
from rrompy.reduction_methods.trained_model import TrainedModelData
trainedModel = tModel()
trainedModel.verbosity = self.verbosity
trainedModel.timestamp = self.timestamp
self.scaleFactor = datadict.pop("scaleFactor")
data = TrainedModelData(name, self.mu0, datadict.pop("projMat"),
self.scaleFactor, datadict.pop("rescalingExp"))
if "mus" in datadict:
data.mus = datadict.pop("mus")
approxParameters = datadict.pop("approxParameters")
data.approxParameters = copy(approxParameters)
if "sampler" in approxParameters:
self._approxParameters["sampler"] = approxParameters.pop("sampler")
self.approxParameters = copy(approxParameters)
if "mus" in data.__dict__:
self.mus = copy(data.mus)
for key in datadict:
setattr(data, key, datadict[key])
trainedModel.data = data
self.trainedModel = trainedModel
self._mode = RROMPy_FRAGILE
vbMng(self, "DEL", "Done loading pre-trained model.", 20)
diff --git a/rrompy/reduction_methods/base/rational_interpolant_utils.py b/rrompy/reduction_methods/base/rational_interpolant_utils.py
index d4bcfa6..b28956f 100644
--- a/rrompy/reduction_methods/base/rational_interpolant_utils.py
+++ b/rrompy/reduction_methods/base/rational_interpolant_utils.py
@@ -1,35 +1,32 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
import numpy as np
from rrompy.utilities.base.types import Np1D
from rrompy.utilities.exception_manager import RROMPyWarning
__all__ = ['checkRobustTolerance']
-def checkRobustTolerance(ev:Np1D, N:int, tol:float) -> dict:
+def checkRobustTolerance(ev:Np1D, tol:float) -> dict:
"""
Perform robustness check on eigen-/singular values and return reduced
parameters with warning.
"""
ts = tol * np.linalg.norm(ev)
- if len(ev) <= np.sum(np.abs(ev) >= ts) + 1: return {}
- RROMPyWarning(("Smallest {} eigenvalues below tolerance. Reducing N by "
- "1.").format(len(ev) - np.sum(np.abs(ev) >= ts)))
- return {"N" : N - 1}
+ return len(ev) - np.sum(np.abs(ev) >= ts)
diff --git a/rrompy/reduction_methods/distributed/rational_interpolant.py b/rrompy/reduction_methods/distributed/rational_interpolant.py
index 7439332..928ee58 100644
--- a/rrompy/reduction_methods/distributed/rational_interpolant.py
+++ b/rrompy/reduction_methods/distributed/rational_interpolant.py
@@ -1,550 +1,559 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
from copy import deepcopy as copy
import numpy as np
from rrompy.reduction_methods.base import checkRobustTolerance
from .generic_distributed_approximant import GenericDistributedApproximant
from rrompy.utilities.poly_fitting.polynomial import (polybases, polyfitname,
nextDerivativeIndices,
hashDerivativeToIdx as hashD,
hashIdxToDerivative as hashI,
homogeneizedpolyvander as hpvP,
homogeneizedToFull,
PolynomialInterpolator as PI)
from rrompy.utilities.poly_fitting.radial_basis import (rbbases,
RadialBasisInterpolator as RBI)
from rrompy.reduction_methods.trained_model import (
TrainedModelRational as tModel)
from rrompy.reduction_methods.trained_model import TrainedModelData
from rrompy.utilities.base.types import (Np1D, Np2D, HFEng, DictAny, Tuple,
List, paramVal, paramList, sampList)
from rrompy.utilities.base import verbosityManager as vbMng
from rrompy.utilities.numerical import multifactorial, customPInv
from rrompy.utilities.exception_manager import (RROMPyException, RROMPyAssert,
RROMPyWarning)
__all__ = ['RationalInterpolant']
class RationalInterpolant(GenericDistributedApproximant):
"""
ROM rational interpolant computation for parametric problems.
Args:
HFEngine: HF problem solver.
mu0(optional): Default parameter. Defaults to 0.
approxParameters(optional): Dictionary containing values for main
parameters of approximant. Recognized keys are:
- 'POD': whether to compute POD of snapshots; defaults to True;
- 'S': total number of samples current approximant relies upon;
- 'sampler': sample point generator;
- 'polybasis': type of polynomial basis for interpolation; allowed
values include 'MONOMIAL', 'CHEBYSHEV' and 'LEGENDRE'; defaults
to 'MONOMIAL';
- 'E': number of derivatives used to compute interpolant; defaults
to 0;
- 'M': degree of rational interpolant numerator; defaults to 0;
- 'N': degree of rational interpolant denominator; defaults to 0;
- 'radialBasis': radial basis family for interpolant numerator;
defaults to 0, i.e. no radial basis;
- 'radialBasisWeights': radial basis weights for interpolant
numerator; defaults to 0, i.e. identity;
- 'interpRcond': tolerance for interpolation; defaults to None;
- 'robustTol': tolerance for robust rational denominator
management; defaults to 0.
Defaults to empty dict.
homogeneized(optional): Whether to homogeneize Dirichlet BCs. Defaults
to False.
verbosity(optional): Verbosity level. Defaults to 10.
Attributes:
HFEngine: HF problem solver.
mu0: Default parameter.
mus: Array of snapshot parameters.
homogeneized: Whether to homogeneize Dirichlet BCs.
approxParameters: Dictionary containing values for main parameters of
approximant. Recognized keys are in parameterList.
parameterListSoft: Recognized keys of soft approximant parameters:
- 'POD': whether to compute POD of snapshots;
- 'polybasis': type of polynomial basis for interpolation;
- 'E': number of derivatives used to compute interpolant;
- 'M': degree of rational interpolant numerator;
- 'N': degree of rational interpolant denominator;
- 'radialBasis': radial basis family for interpolant numerator;
- 'radialBasisWeights': radial basis weights for interpolant
numerator;
- 'interpRcond': tolerance for interpolation via numpy.polyfit;
- 'robustTol': tolerance for robust rational denominator
management.
parameterListCritical: Recognized keys of critical approximant
parameters:
- 'S': total number of samples current approximant relies upon;
- 'sampler': sample point generator.
POD: Whether to compute POD of snapshots.
S: Number of solution snapshots over which current approximant is
based upon.
sampler: Sample point generator.
polybasis: type of polynomial basis for interpolation.
M: Numerator degree of approximant.
N: Denominator degree of approximant.
radialBasis: Radial basis family for interpolant numerator.
radialBasisWeights: Radial basis weights for interpolant numerator.
interpRcond: Tolerance for interpolation via numpy.polyfit.
robustTol: Tolerance for robust rational denominator management.
E: Complete derivative depth level of S.
muBounds: list of bounds for parameter values.
samplingEngine: Sampling engine.
uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as
sampleList.
lastSolvedHF: Parameter(s) corresponding to last computed high fidelity
solution(s) as parameterList.
uApproxReduced: Reduced approximate solution(s) with parameter(s)
lastSolvedApprox as sampleList.
lastSolvedApproxReduced: Parameter(s) corresponding to last computed
reduced approximate solution(s) as parameterList.
uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as
sampleList.
lastSolvedApprox: Parameter(s) corresponding to last computed
approximate solution(s) as parameterList.
Q: Numpy 1D vector containing complex coefficients of approximant
denominator.
P: Numpy 2D vector whose columns are FE dofs of coefficients of
approximant numerator.
"""
def __init__(self, HFEngine:HFEng, mu0 : paramVal = None,
approxParameters : DictAny = {}, homogeneized : bool = False,
verbosity : int = 10, timestamp : bool = True):
self._preInit()
self._addParametersToList(["polybasis", "E", "M", "N", "radialBasis",
"radialBasisWeights", "interpRcond",
"robustTol"],
["MONOMIAL", -1, 0, 0, 0, 1, -1, 0])
super().__init__(HFEngine = HFEngine, mu0 = mu0,
approxParameters = approxParameters,
homogeneized = homogeneized,
verbosity = verbosity, timestamp = timestamp)
+ self.catchInstability = False
self._postInit()
@property
def polybasis(self):
"""Value of polybasis."""
return self._polybasis
@polybasis.setter
def polybasis(self, polybasis):
try:
polybasis = polybasis.upper().strip().replace(" ","")
if polybasis not in polybases:
raise RROMPyException("Prescribed polybasis not recognized.")
self._polybasis = polybasis
except:
RROMPyWarning(("Prescribed polybasis not recognized. Overriding "
"to 'MONOMIAL'."))
self._polybasis = "MONOMIAL"
self._approxParameters["polybasis"] = self.polybasis
@property
def radialBasis(self):
"""Value of radialBasis."""
return self._radialBasis
@radialBasis.setter
def radialBasis(self, radialBasis):
try:
if radialBasis != 0:
radialBasis = radialBasis.upper().strip().replace(" ","")
if radialBasis not in rbbases:
raise RROMPyException(("Prescribed radialBasis not "
"recognized."))
self._radialBasis = radialBasis
except:
RROMPyWarning(("Prescribed radialBasis not recognized. Overriding "
"to 0."))
self._radialBasis = 0
self._approxParameters["radialBasis"] = self.radialBasis
@property
def polybasisP(self):
if self.radialBasis == 0:
return self._polybasis
return self._polybasis + "_" + self.radialBasis
@property
def interpRcond(self):
"""Value of interpRcond."""
return self._interpRcond
@interpRcond.setter
def interpRcond(self, interpRcond):
self._interpRcond = interpRcond
self._approxParameters["interpRcond"] = self.interpRcond
@property
def radialBasisWeights(self):
"""Value of radialBasisWeights."""
return self._radialBasisWeights
@radialBasisWeights.setter
def radialBasisWeights(self, radialBasisWeights):
self._radialBasisWeights = radialBasisWeights
self._approxParameters["radialBasisWeights"] = self.radialBasisWeights
@property
def M(self):
"""Value of M. Its assignment may change S."""
return self._M
@M.setter
def M(self, M):
if M < 0: raise RROMPyException("M must be non-negative.")
self._M = M
self._approxParameters["M"] = self.M
if hasattr(self, "_E") and self.E >= 0 and self.E < self.M:
RROMPyWarning("Prescribed S is too small. Decreasing M.")
self.M = self.E
@property
def N(self):
"""Value of N. Its assignment may change S."""
return self._N
@N.setter
def N(self, N):
if N < 0: raise RROMPyException("N must be non-negative.")
self._N = N
self._approxParameters["N"] = self.N
if hasattr(self, "_E") and self.E >= 0 and self.E < self.N:
RROMPyWarning("Prescribed S is too small. Decreasing N.")
self.N = self.E
@property
def E(self):
"""Value of E."""
return self._E
@E.setter
def E(self, E):
if E < 0:
if not hasattr(self, "_S"):
raise RROMPyException(("Value of E must be positive if S is "
"not yet assigned."))
E = np.sum(hashI(np.prod(self.S), self.npar)) - 1
self._E = E
self._approxParameters["E"] = self.E
if (hasattr(self, "_S")
and self.E >= np.sum(hashI(np.prod(self.S), self.npar))):
RROMPyWarning("Prescribed S is too small. Decreasing E.")
self.E = -1
if hasattr(self, "_M"): self.M = self.M
if hasattr(self, "_N"): self.N = self.N
@property
def robustTol(self):
"""Value of tolerance for robust rational denominator management."""
return self._robustTol
@robustTol.setter
def robustTol(self, robustTol):
if robustTol < 0.:
RROMPyWarning(("Overriding prescribed negative robustness "
"tolerance to 0."))
robustTol = 0.
self._robustTol = robustTol
self._approxParameters["robustTol"] = self.robustTol
@property
def S(self):
"""Value of S."""
return self._S
@S.setter
def S(self, S):
GenericDistributedApproximant.S.fset(self, S)
if hasattr(self, "_M"): self.M = self.M
if hasattr(self, "_N"): self.N = self.N
if hasattr(self, "_E"): self.E = self.E
def resetSamples(self):
"""Reset samples."""
super().resetSamples()
self._musUniqueCN = None
self._derIdxs = None
self._reorder = None
def _setupInterpolationIndices(self):
"""Setup parameters for polyvander."""
RROMPyAssert(self._mode,
message = "Cannot setup interpolation indices.")
if self._musUniqueCN is None or len(self._reorder) != len(self.mus):
self._musUniqueCN, musIdxsTo, musIdxs, musCount = (
self.centerNormalize(self.mus).unique(return_index = True,
return_inverse = True,
return_counts = True))
self._musUnique = self.mus[musIdxsTo]
self._derIdxs = [None] * len(self._musUniqueCN)
self._reorder = np.empty(len(musIdxs), dtype = int)
filled = 0
for j, cnt in enumerate(musCount):
self._derIdxs[j] = nextDerivativeIndices([], self.mus.shape[1],
cnt)
jIdx = np.nonzero(musIdxs == j)[0]
self._reorder[jIdx] = np.arange(filled, filled + cnt)
filled += cnt
def _setupDenominator(self):
"""Compute rational denominator."""
RROMPyAssert(self._mode, message = "Cannot setup denominator.")
vbMng(self, "INIT", "Starting computation of denominator.", 7)
while self.N > 0:
- invD = self._computeInterpolantInverseBlocks()
+ invD, fitinv = self._computeInterpolantInverseBlocks()
if self.POD:
ev, eV = self.findeveVGQR(self.samplingEngine.RPOD, invD)
else:
ev, eV = self.findeveVGExplicit(self.samplingEngine.samples,
invD)
- newParams = checkRobustTolerance(ev, self.N, self.robustTol)
- if not newParams:
- break
- self.approxParameters = newParams
+ nevBad = checkRobustTolerance(ev, self.robustTol)
+ if nevBad <= 1: break
+ if self.catchInstability:
+ raise RROMPyException(("Instability in denominator "
+ "computation: eigenproblem is poorly "
+ "conditioned."))
+ RROMPyWarning(("Smallest {} eigenvalues below tolerance. Reducing "
+ "N by 1.").format(nevBad))
+ self.N = self.N - 1
if self.N <= 0:
self._N = 0
eV = np.ones((1, 1))
q = PI()
q.npar = self.npar
q.polybasis = self.polybasis
q.coeffs = homogeneizedToFull(tuple([self.N + 1] * self.npar),
self.npar, eV[:, 0])
vbMng(self, "DEL", "Done computing denominator.", 7)
- return q
+ return q, fitinv
def _setupNumerator(self):
"""Compute rational numerator."""
RROMPyAssert(self._mode, message = "Cannot setup numerator.")
vbMng(self, "INIT", "Starting computation of numerator.", 7)
Qevaldiag = np.zeros((len(self.mus), len(self.mus)),
dtype = np.complex)
verb = self.trainedModel.verbosity
self.trainedModel.verbosity = 0
self._setupInterpolationIndices()
idxGlob = 0
for j, derIdxs in enumerate(self._derIdxs):
nder = len(derIdxs)
idxLoc = np.arange(len(self.mus))[(self._reorder >= idxGlob)
* (self._reorder < idxGlob + nder)]
idxGlob += nder
Qval = [0] * nder
for der in range(nder):
derIdx = hashI(der, self.npar)
Qval[der] = (self.trainedModel.getQVal(
self._musUnique[j], derIdx,
scl = np.power(self.scaleFactor, -1.))
/ multifactorial(derIdx))
for derU, derUIdx in enumerate(derIdxs):
for derQ, derQIdx in enumerate(derIdxs):
diffIdx = [x - y for (x, y) in zip(derUIdx, derQIdx)]
if all([x >= 0 for x in diffIdx]):
diffj = hashD(diffIdx)
Qevaldiag[idxLoc[derU], idxLoc[derQ]] = Qval[diffj]
if self.POD:
Qevaldiag = Qevaldiag.dot(self.samplingEngine.RPOD.T)
self.trainedModel.verbosity = verb
while self.M >= 0:
if self.radialBasis == 0:
p = PI()
wellCond, msg = p.setupByInterpolation(
self._musUniqueCN, Qevaldiag, self.M,
self.polybasisP, self.verbosity >= 5, True,
{"derIdxs": self._derIdxs, "reorder": self._reorder,
"scl": np.power(self.scaleFactor, -1.)},
{"rcond": self.interpRcond})
else:
p = RBI()
wellCond, msg = p.setupByInterpolation(
self._musUniqueCN, Qevaldiag, self.M, self.polybasisP,
self.radialBasisWeights, self.verbosity >= 5, True,
{"derIdxs": self._derIdxs, "reorder": self._reorder,
"scl": np.power(self.scaleFactor, -1.)},
{"rcond": self.interpRcond})
vbMng(self, "MAIN", msg, 5)
if wellCond: break
+ if self.catchInstability:
+ raise RROMPyException(("Instability in numerator computation: "
+ "polyfit is poorly conditioned."))
RROMPyWarning("Polyfit is poorly conditioned. Reducing M by 1.")
self.M -= 1
if self.M < 0:
raise RROMPyException(("Instability in computation of numerator. "
"Aborting."))
vbMng(self, "DEL", "Done computing numerator.", 7)
return p
def setupApprox(self):
"""
Compute rational interpolant.
SVD-based robust eigenvalue management.
"""
if self.checkComputedApprox():
return
RROMPyAssert(self._mode, message = "Cannot setup approximant.")
vbMng(self, "INIT", "Setting up {}.". format(self.name()), 5)
self.computeScaleFactor()
self.computeSnapshots()
if self.trainedModel is None:
self.trainedModel = tModel()
self.trainedModel.verbosity = self.verbosity
self.trainedModel.timestamp = self.timestamp
data = TrainedModelData(self.trainedModel.name(), self.mu0,
self.samplingEngine.samples,
self.scaleFactor,
self.HFEngine.rescalingExp)
data.mus = copy(self.mus)
self.trainedModel.data = data
else:
self.trainedModel = self.trainedModel
self.trainedModel.data.projMat = copy(self.samplingEngine.samples)
if self.N > 0:
- Q = self._setupDenominator()
+ Q = self._setupDenominator()[0]
else:
Q = PI()
Q.coeffs = np.ones(tuple([1] * self.npar), dtype = np.complex)
Q.npar = self.npar
Q.polybasis = self.polybasis
self.trainedModel.data.Q = Q
self.trainedModel.data.P = self._setupNumerator()
self.trainedModel.data.approxParameters = copy(self.approxParameters)
vbMng(self, "DEL", "Done setting up approximant.", 5)
- def _computeInterpolantInverseBlocks(self) -> List[Np2D]:
+ def _computeInterpolantInverseBlocks(self) -> Tuple[List[Np2D], Np2D]:
"""
Compute inverse factors for minimal interpolant target functional.
"""
RROMPyAssert(self._mode, message = "Cannot solve eigenvalue problem.")
self._setupInterpolationIndices()
while self.E >= 0:
eWidth = (hashD([self.E + 1] + [0] * (self.npar - 1))
- hashD([self.E] + [0] * (self.npar - 1)))
TE, _, argIdxs = hpvP(self._musUniqueCN, self.E, self.polybasis,
self._derIdxs, self._reorder,
scl = np.power(self.scaleFactor, -1.))
fitOut = customPInv(TE[:, argIdxs], rcond = self.interpRcond,
full = True)
vbMng(self, "MAIN",
("Fitting {} samples with degree {} through {}... "
"Conditioning of pseudoinverse system: {:.4e}.").format(
TE.shape[0], self.E,
polyfitname(self.polybasis),
fitOut[1][1][0] / fitOut[1][1][-1]),
5)
if fitOut[1][0] == len(argIdxs):
- self._fitinv = fitOut[0][- eWidth : , :]
+ fitinv = fitOut[0][- eWidth : , :]
break
RROMPyWarning("Polyfit is poorly conditioned. Reducing E by 1.")
self.E -= 1
if self.E < 0:
raise RROMPyException(("Instability in computation of "
"denominator. Aborting."))
TN, _, argIdxs = hpvP(self._musUniqueCN, self.N, self.polybasis,
self._derIdxs, self._reorder,
scl = np.power(self.scaleFactor, -1.))
TN = TN[:, argIdxs]
invD = [None] * (eWidth)
for k in range(eWidth):
- pseudoInv = np.diag(self._fitinv[k, :])
+ pseudoInv = np.diag(fitinv[k, :])
idxGlob = 0
for j, derIdxs in enumerate(self._derIdxs):
nder = len(derIdxs)
idxGlob += nder
if nder > 1:
idxLoc = np.arange(len(self.mus))[
(self._reorder >= idxGlob - nder)
* (self._reorder < idxGlob)]
- invLoc = self._fitinv[k, idxLoc]
+ invLoc = fitinv[k, idxLoc]
pseudoInv[np.ix_(idxLoc, idxLoc)] = 0.
for diffj, diffjIdx in enumerate(derIdxs):
for derQ, derQIdx in enumerate(derIdxs):
derUIdx = [x - y for (x, y) in
zip(diffjIdx, derQIdx)]
if all([x >= 0 for x in derUIdx]):
derU = hashD(derUIdx)
pseudoInv[idxLoc[derU], idxLoc[derQ]] = (
invLoc[diffj])
invD[k] = pseudoInv.dot(TN)
- return invD
+ return invD, fitinv
def findeveVGExplicit(self, sampleE:sampList,
invD:List[Np2D]) -> Tuple[Np1D, Np2D]:
"""
Compute explicitly eigenvalues and eigenvectors of rational denominator
matrix.
"""
RROMPyAssert(self._mode, message = "Cannot solve eigenvalue problem.")
nEnd = invD[0].shape[1]
eWidth = len(invD)
vbMng(self, "INIT", "Building gramian matrix.", 10)
gramian = self.HFEngine.innerProduct(sampleE, sampleE)
G = np.zeros((nEnd, nEnd), dtype = np.complex)
for k in range(eWidth):
G += invD[k].T.conj().dot(gramian.dot(invD[k]))
vbMng(self, "DEL", "Done building gramian.", 10)
vbMng(self, "INIT", "Solving eigenvalue problem for gramian matrix.",
7)
ev, eV = np.linalg.eigh(G)
vbMng(self, "MAIN",
("Solved eigenvalue problem of size {} with condition number "
"{:.4e}.").format(nEnd, ev[-1] / ev[0]), 5)
vbMng(self, "DEL", "Done solving eigenvalue problem.", 7)
return ev, eV
def findeveVGQR(self, RPODE:Np2D, invD:List[Np2D]) -> Tuple[Np1D, Np2D]:
"""
Compute eigenvalues and eigenvectors of rational denominator matrix
through SVD of R factor.
"""
RROMPyAssert(self._mode, message = "Cannot solve eigenvalue problem.")
nEnd = invD[0].shape[1]
S = RPODE.shape[0]
eWidth = len(invD)
vbMng(self, "INIT", "Building half-gramian matrix stack.", 10)
Rstack = np.zeros((S * eWidth, nEnd), dtype = np.complex)
for k in range(eWidth):
Rstack[k * S : (k + 1) * S, :] = RPODE.dot(invD[k])
vbMng(self, "DEL", "Done building half-gramian.", 10)
vbMng(self, "INIT", "Solving svd for square root of gramian matrix.",
7)
_, s, eV = np.linalg.svd(Rstack, full_matrices = False)
ev = s[::-1]
eV = eV[::-1, :].T.conj()
vbMng(self, "MAIN",
("Solved svd problem of size {} x {} with condition number "
"{:.4e}.").format(*Rstack.shape, s[0] / s[-1]), 5)
vbMng(self, "DEL", "Done solving svd.", 7)
return ev, eV
def centerNormalize(self, mu : paramList = [],
mu0 : paramVal = None) -> paramList:
"""
Compute normalized parameter to be plugged into approximant.
Args:
mu: Parameter(s) 1.
mu0: Parameter(s) 2. If None, set to self.mu0.
Returns:
Normalized parameter.
"""
return self.trainedModel.centerNormalize(mu, mu0)
def getResidues(self, *args, **kwargs) -> Np1D:
"""
Obtain approximant residues.
Returns:
Matrix with residues as columns.
"""
return self.trainedModel.getResidues(*args, **kwargs)
diff --git a/rrompy/reduction_methods/distributed_greedy/generic_distributed_greedy_approximant.py b/rrompy/reduction_methods/distributed_greedy/generic_distributed_greedy_approximant.py
index d118582..d1ae9f5 100644
--- a/rrompy/reduction_methods/distributed_greedy/generic_distributed_greedy_approximant.py
+++ b/rrompy/reduction_methods/distributed_greedy/generic_distributed_greedy_approximant.py
@@ -1,580 +1,583 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
from copy import deepcopy as copy
import numpy as np
from rrompy.reduction_methods.distributed.generic_distributed_approximant \
import GenericDistributedApproximant
from rrompy.utilities.base.types import (Np1D, Np2D, DictAny, HFEng, Tuple,
List, normEng, paramVal, paramList,
sampList)
from rrompy.utilities.base import verbosityManager as vbMng
from rrompy.solver import normEngine
from rrompy.utilities.exception_manager import (RROMPyException, RROMPyAssert,
RROMPyWarning)
from rrompy.parameter import checkParameterList, emptyParameterList
__all__ = ['GenericDistributedGreedyApproximant']
def pruneSamples(mus:paramList, badmus:paramList,
tol : float = 1e-8) -> paramList:
"""Remove from mus all the elements which are too close to badmus."""
if len(badmus) == 0: return mus
musNp = np.array(mus(0))
badmus = np.array(badmus(0))
proximity = np.min(np.abs(musNp.reshape(-1, 1)
- np.tile(badmus.reshape(1, -1), [len(mus), 1])),
axis = 1).flatten()
idxPop = np.arange(len(mus))[proximity <= tol]
for i, j in enumerate(idxPop):
mus.pop(j - i)
return mus
class GenericDistributedGreedyApproximant(GenericDistributedApproximant):
"""
ROM greedy interpolant computation for parametric problems
(ABSTRACT).
Args:
HFEngine: HF problem solver.
mu0(optional): Default parameter. Defaults to 0.
approxParameters(optional): Dictionary containing values for main
parameters of approximant. Recognized keys are:
- 'POD': whether to compute POD of snapshots; defaults to True;
- 'S': number of starting training points;
- 'sampler': sample point generator;
- 'greedyTol': uniform error tolerance for greedy algorithm;
defaults to 1e-2;
- 'interactive': whether to interactively terminate greedy
algorithm; defaults to False;
- 'maxIter': maximum number of greedy steps; defaults to 1e2;
- 'refinementRatio': ratio of test points to be exhausted before
test set refinement; defaults to 0.2;
- 'nTestPoints': number of test points; defaults to 5e2;
- - 'trainSetGenerator': training sample points generator.
+ - 'trainSetGenerator': training sample points generator; defaults
+ to sampler.
Defaults to empty dict.
homogeneized(optional): Whether to homogeneize Dirichlet BCs. Defaults
to False.
verbosity(optional): Verbosity level. Defaults to 10.
Attributes:
HFEngine: HF problem solver.
mu0: Default parameter.
mus: Array of snapshot parameters.
homogeneized: Whether to homogeneize Dirichlet BCs.
approxParameters: Dictionary containing values for main parameters of
approximant. Recognized keys are in parameterList.
parameterListSoft: Recognized keys of soft approximant parameters:
- 'POD': whether to compute POD of snapshots.
- 'greedyTol': uniform error tolerance for greedy algorithm;
- 'interactive': whether to interactively terminate greedy
algorithm;
- 'maxIter': maximum number of greedy steps;
- 'refinementRatio': ratio of test points to be exhausted before
test set refinement;
- - 'nTestPoints': number of test points.
+ - 'nTestPoints': number of test points;
+ - 'trainSetGenerator': training sample points generator.
parameterListCritical: Recognized keys of critical approximant
parameters:
- 'S': total number of samples current approximant relies upon;
- - 'sampler': sample point generator;
- - 'trainSetGenerator': training sample points generator.
+ - 'sampler': sample point generator.
POD: whether to compute POD of snapshots.
S: number of test points.
sampler: Sample point generator.
greedyTol: uniform error tolerance for greedy algorithm.
interactive: whether to interactively terminate greedy algorithm.
maxIter: maximum number of greedy steps.
refinementRatio: ratio of training points to be exhausted before
training set refinement.
nTestPoints: number of starting training points.
trainSetGenerator: training sample points generator.
muBounds: list of bounds for parameter values.
samplingEngine: Sampling engine.
estimatorNormEngine: Engine for estimator norm computation.
uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as
sampleList.
lastSolvedHF: Parameter(s) corresponding to last computed high fidelity
solution(s) as parameterList.
uApproxReduced: Reduced approximate solution(s) with parameter(s)
lastSolvedApprox as sampleList.
lastSolvedApproxReduced: Parameter(s) corresponding to last computed
reduced approximate solution(s) as parameterList.
uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as
sampleList.
lastSolvedApprox: Parameter(s) corresponding to last computed
approximate solution(s) as parameterList.
"""
TOL_INSTABILITY = 1e-6
def __init__(self, HFEngine:HFEng, mu0 : paramVal = None,
approxParameters : DictAny = {}, homogeneized : bool = False,
verbosity : int = 10, timestamp : bool = True):
self._preInit()
- from rrompy.parameter.parameter_sampling import QuadratureSampler as QS
self._addParametersToList(["greedyTol", "interactive", "maxIter",
"refinementRatio", "nTestPoints"],
[1e-2, False, 1e2, .2, 5e2],
- ["trainSetGenerator"],
- [QS([[0], [1]], "UNIFORM")])
- del QS
+ ["trainSetGenerator"], ["AUTO"])
super().__init__(HFEngine = HFEngine, mu0 = mu0,
approxParameters = approxParameters,
homogeneized = homogeneized, verbosity = verbosity,
timestamp = timestamp)
RROMPyAssert(self.HFEngine.npar, 1, "Parameter dimension")
self._postInit()
@property
def greedyTol(self):
"""Value of greedyTol."""
return self._greedyTol
@greedyTol.setter
def greedyTol(self, greedyTol):
if greedyTol < 0:
raise RROMPyException("greedyTol must be non-negative.")
if hasattr(self, "_greedyTol") and self.greedyTol is not None:
greedyTolold = self.greedyTol
else:
greedyTolold = -1
self._greedyTol = greedyTol
self._approxParameters["greedyTol"] = self.greedyTol
if greedyTolold != self.greedyTol:
self.resetSamples()
@property
def interactive(self):
"""Value of interactive."""
return self._interactive
@interactive.setter
def interactive(self, interactive):
self._interactive = interactive
@property
def maxIter(self):
"""Value of maxIter."""
return self._maxIter
@maxIter.setter
def maxIter(self, maxIter):
if maxIter <= 0: raise RROMPyException("maxIter must be positive.")
if hasattr(self, "_maxIter") and self.maxIter is not None:
maxIterold = self.maxIter
else:
maxIterold = -1
self._maxIter = maxIter
self._approxParameters["maxIter"] = self.maxIter
if maxIterold != self.maxIter:
self.resetSamples()
@property
def refinementRatio(self):
"""Value of refinementRatio."""
return self._refinementRatio
@refinementRatio.setter
def refinementRatio(self, refinementRatio):
if refinementRatio <= 0. or refinementRatio > 1.:
raise RROMPyException(("refinementRatio must be between 0 "
"(excluded) and 1."))
if (hasattr(self, "_refinementRatio")
and self.refinementRatio is not None):
refinementRatioold = self.refinementRatio
else:
refinementRatioold = -1
self._refinementRatio = refinementRatio
self._approxParameters["refinementRatio"] = self.refinementRatio
if refinementRatioold != self.refinementRatio:
self.resetSamples()
@property
def nTestPoints(self):
"""Value of nTestPoints."""
return self._nTestPoints
@nTestPoints.setter
def nTestPoints(self, nTestPoints):
if nTestPoints <= 0:
raise RROMPyException("nTestPoints must be positive.")
if not np.isclose(nTestPoints, np.int(nTestPoints)):
raise RROMPyException("nTestPoints must be an integer.")
nTestPoints = np.int(nTestPoints)
if hasattr(self, "_nTestPoints") and self.nTestPoints is not None:
nTestPointsold = self.nTestPoints
else:
nTestPointsold = -1
self._nTestPoints = nTestPoints
self._approxParameters["nTestPoints"] = self.nTestPoints
if nTestPointsold != self.nTestPoints:
self.resetSamples()
@property
def trainSetGenerator(self):
"""Value of trainSetGenerator."""
return self._trainSetGenerator
@trainSetGenerator.setter
def trainSetGenerator(self, trainSetGenerator):
+ if (isinstance(trainSetGenerator, (str,))
+ and trainSetGenerator.upper() == "AUTO"):
+ trainSetGenerator = self.sampler
if 'generatePoints' not in dir(trainSetGenerator):
raise RROMPyException("trainSetGenerator type not recognized.")
if (hasattr(self, '_trainSetGenerator')
- and self.trainSetGenerator is not None):
+ and self.trainSetGenerator not in [None, "AUTO"]):
trainSetGeneratorOld = self.trainSetGenerator
self._trainSetGenerator = trainSetGenerator
self._approxParameters["trainSetGenerator"] = self.trainSetGenerator
if (not 'trainSetGeneratorOld' in locals()
or trainSetGeneratorOld != self.trainSetGenerator):
self.resetSamples()
def resetSamples(self):
"""Reset samples."""
super().resetSamples()
self._mus = emptyParameterList()
def initEstimatorNormEngine(self, normEngn : normEng = None):
"""Initialize estimator norm engine."""
if (normEngn is not None or not hasattr(self, "estimatorNormEngine")
or self.estimatorNormEngine is None):
if normEngn is None:
if not hasattr(self.HFEngine, "energyNormPartialDualMatrix"):
self.HFEngine.buildEnergyNormPartialDualForm()
estimatorEnergyMatrix = (
self.HFEngine.energyNormPartialDualMatrix)
else:
if hasattr(normEngn, "buildEnergyNormPartialDualForm"):
if not hasattr(normEngn, "energyNormPartialDualMatrix"):
normEngn.buildEnergyNormPartialDualForm()
estimatorEnergyMatrix = (
normEngn.energyNormPartialDualMatrix)
else:
estimatorEnergyMatrix = normEngn
self.estimatorNormEngine = normEngine(estimatorEnergyMatrix)
def errorEstimator(self, mus:paramList) -> List[complex]:
"""
Standard residual-based error estimator with explicit residual
computation.
"""
self.setupApprox()
+ res = self.getRes(mus, homogeneized = self.homogeneized,
+ duality = False)
if self.HFEngine.nbs == 1:
RHS = self.getRHS(mus[0], homogeneized = self.homogeneized,
duality = False)
- RHSNorm = self.estimatorNormEngine.norm(RHS)
- res = self.getRes(mus, homogeneized = self.homogeneized,
- duality = False)
- err = self.estimatorNormEngine.norm(res) / RHSNorm
else:
- res = self.getRes(mus, homogeneized = self.homogeneized,
- duality = False)
RHS = self.getRHS(mus, homogeneized = self.homogeneized,
duality = False)
- err = (self.estimatorNormEngine.norm(res)
- / self.estimatorNormEngine.norm(RHS))
- return np.abs(err)
+ return np.abs(self.estimatorNormEngine.norm(res)
+ / self.estimatorNormEngine.norm(RHS))
def getMaxErrorEstimator(self, mus:paramList,
plot : bool = False) -> Tuple[Np1D, int, float]:
"""
Compute maximum of (and index of maximum of) error estimator over given
parameters.
"""
errorEstTest = self.errorEstimator(mus)
idxMaxEst = np.argmax(errorEstTest)
maxEst = errorEstTest[idxMaxEst]
if plot and not np.all(np.isinf(errorEstTest)):
musre = mus.re(0)
from matplotlib import pyplot as plt
plt.figure()
plt.semilogy(musre, errorEstTest, 'k')
plt.semilogy([musre[0], musre[-1]], [self.greedyTol] * 2, 'r--')
plt.semilogy(self.mus.re(0),
2. * self.greedyTol * np.ones(len(self.mus)), '*m')
plt.semilogy(musre[idxMaxEst], maxEst, 'xr')
plt.grid()
plt.show()
plt.close()
return errorEstTest, idxMaxEst, maxEst
def greedyNextSample(self, muidx:int, plotEst : bool = False)\
-> Tuple[Np1D, int, float, paramVal]:
"""Compute next greedy snapshot of solution map."""
RROMPyAssert(self._mode, message = "Cannot add greedy sample.")
mu = copy(self.muTest[muidx])
self.muTest.pop(muidx)
vbMng(self, "MAIN",
("Adding {}-th sample point at {} to training "
"set.").format(self.samplingEngine.nsamples + 1, mu), 2)
self.mus.append(mu)
self.samplingEngine.nextSample(mu, homogeneized = self.homogeneized)
errorEstTest, muidx, maxErrorEst = self.getMaxErrorEstimator(
self.muTest, plotEst)
return errorEstTest, muidx, maxErrorEst, self.muTest[muidx]
def _preliminaryTraining(self):
"""Initialize starting snapshots of solution map."""
RROMPyAssert(self._mode, message = "Cannot start greedy algorithm.")
self.computeScaleFactor()
if self.samplingEngine.nsamples > 0:
return
vbMng(self, "INIT", "Starting computation of snapshots.", 2)
self.resetSamples()
self.mus = self.trainSetGenerator.generatePoints(self.S)
+ muTestBase = self.sampler.generatePoints(self.nTestPoints)
+ muTestBase = pruneSamples(muTestBase, self.mus,
+ 1e-10 * self.scaleFactor[0]).sort()
muLast = copy(self.mus[-1])
self.mus.pop()
- muTestBase = self.sampler.generatePoints(self.nTestPoints)
if len(self.mus) > 0:
+ nSamples = np.prod(self.S) - 1
vbMng(self, "MAIN",
- ("Adding first {} samples point at {} to training "
- "set.").format(np.prod(self.S) - 1, self.mus), 2)
+ ("Adding first {} sample point{} at {} to training "
+ "set.").format(nSamples, "" + "s" * (nSamples > 1),
+ self.mus), 2)
self.samplingEngine.iterSample(self.mus,
homogeneized = self.homogeneized)
- muTestBase = pruneSamples(muTestBase, self.mus,
- 1e-10 * self.scaleFactor[0]).sort()
self.muTest = emptyParameterList()
self.muTest.reset((len(muTestBase) + 1, self.mus.shape[1]))
self.muTest[: -1] = muTestBase
self.muTest[-1] = muLast
def _enrichTestSet(self, nTest:int):
"""Add extra elements to test set."""
RROMPyAssert(self._mode, message = "Cannot enrich test set.")
muTestExtra = self.sampler.generatePoints(2 * nTest)
muTotal = copy(self.mus)
muTotal.append(self.muTest)
muTestExtra = pruneSamples(muTestExtra, muTotal,
1e-10 * self.scaleFactor[0])
muTestNew = np.empty(len(self.muTest) + len(muTestExtra),
dtype = np.complex)
muTestNew[: len(self.muTest)] = self.muTest(0)
muTestNew[len(self.muTest) :] = muTestExtra(0)
self.muTest = checkParameterList(muTestNew.sort(), self.npar)[0]
vbMng(self, "MAIN",
"Enriching test set by {} elements.".format(len(muTestExtra)), 5)
def greedy(self, plotEst : bool = False):
"""Compute greedy snapshots of solution map."""
RROMPyAssert(self._mode, message = "Cannot start greedy algorithm.")
if self.samplingEngine.nsamples > 0:
return
self._preliminaryTraining()
nTest = self.nTestPoints
errorEstTest, muidx, maxErrorEst, mu = self.greedyNextSample(-1,
plotEst)
vbMng(self, "MAIN",
"Uniform testing error estimate {:.4e}.".format(maxErrorEst), 2)
trainedModelOld = copy(self.trainedModel)
while (self.samplingEngine.nsamples < self.maxIter
and maxErrorEst > self.greedyTol):
if (1. - self.refinementRatio) * nTest > len(self.muTest):
self._enrichTestSet(nTest)
nTest = len(self.muTest)
muTestOld, maxErrorEstOld = self.muTest, maxErrorEst
errorEstTest, muidx, maxErrorEst, mu = self.greedyNextSample(
muidx, plotEst)
vbMng(self, "MAIN",
"Uniform testing error estimate {:.4e}.".format(maxErrorEst),
2)
if (np.isnan(maxErrorEst) or np.isinf(maxErrorEst)
or maxErrorEstOld < maxErrorEst * self.TOL_INSTABILITY):
RROMPyWarning(("Instability in a posteriori estimator. "
"Starting preemptive greedy loop termination."))
maxErrorEst = maxErrorEstOld
self.muTest = muTestOld
self.mus = self.mus[:-1]
self.samplingEngine.popSample()
self.trainedModel.data = copy(trainedModelOld.data)
break
trainedModelOld.data = copy(self.trainedModel.data)
if (self.interactive and maxErrorEst <= self.greedyTol):
vbMng(self, "MAIN",
("Required tolerance {} achieved. Want to decrease "
"greedyTol and continue? Y/N").format(self.greedyTol),
0, end = "")
increasemaxIter = input()
if increasemaxIter.upper() == "Y":
vbMng(self, "MAIN", "Reducing value of greedyTol...", 0)
while maxErrorEst <= self._greedyTol:
self._greedyTol *= .5
if (self.interactive
and self.samplingEngine.nsamples >= self.maxIter):
vbMng(self, "MAIN",
("Maximum number of iterations {} reached. Want to "
"increase maxIter and continue? "
"Y/N").format(self.maxIter), 0, end = "")
increasemaxIter = input()
if increasemaxIter.upper() == "Y":
vbMng(self, "MAIN", "Doubling value of maxIter...", 0)
self._maxIter *= 2
vbMng(self, "DEL",
("Done computing snapshots (final snapshot count: "
"{}).").format(self.samplingEngine.nsamples), 2)
def checkComputedApprox(self) -> bool:
"""
Check if setup of new approximant is not needed.
Returns:
True if new setup is not needed. False otherwise.
"""
return (super().checkComputedApprox()
and len(self.mus) == self.trainedModel.data.projMat.shape[1])
def assembleReducedResidualGramian(self, pMat:sampList):
"""
Build residual gramian of reduced linear system through projections.
"""
self.initEstimatorNormEngine()
if (not hasattr(self.trainedModel.data, "gramian")
or self.trainedModel.data.gramian is None):
gramian = self.estimatorNormEngine.innerProduct(pMat, pMat)
else:
Sold = self.trainedModel.data.gramian.shape[0]
S = len(self.mus)
if Sold > S:
gramian = self.trainedModel.data.gramian[: S, : S]
else:
idxOld = list(range(Sold))
idxNew = list(range(Sold, S))
gramian = np.empty((S, S), dtype = np.complex)
gramian[: Sold, : Sold] = self.trainedModel.data.gramian
gramian[: Sold, Sold :] = (
self.estimatorNormEngine.innerProduct(pMat(idxNew),
pMat(idxOld)))
gramian[Sold :, : Sold] = gramian[: Sold, Sold :].T.conj()
gramian[Sold :, Sold :] = (
self.estimatorNormEngine.innerProduct(pMat(idxNew),
pMat(idxNew)))
self.trainedModel.data.gramian = gramian
- def assembleReducedResidualBlocksbb(self, bs:List[Np1D],
- scaling : float = 1.):
+ def assembleReducedResidualBlocksbb(self, bs:List[Np1D]):
"""
Build blocks (of type bb) of reduced linear system through projections.
"""
self.initEstimatorNormEngine()
nbs = len(bs)
if (not hasattr(self.trainedModel.data, "resbb")
or self.trainedModel.data.resbb is None):
resbb = np.empty((nbs, nbs), dtype = np.complex)
for i in range(nbs):
- Mbi = scaling ** i * bs[i]
+ Mbi = bs[i]
resbb[i, i] = self.estimatorNormEngine.innerProduct(Mbi, Mbi)
for j in range(i):
- Mbj = scaling ** j * bs[j]
+ Mbj = bs[j]
resbb[i, j] = self.estimatorNormEngine.innerProduct(Mbj,
Mbi)
for i in range(nbs):
for j in range(i + 1, nbs):
resbb[i, j] = resbb[j, i].conj()
self.trainedModel.data.resbb = resbb
def assembleReducedResidualBlocksAb(self, As:List[Np2D], bs:List[Np1D],
- pMat:sampList, scaling : float = 1.):
+ pMat:sampList):
"""
Build blocks (of type Ab) of reduced linear system through projections.
"""
self.initEstimatorNormEngine()
nAs = len(As)
nbs = len(bs)
S = len(self.mus)
if (not hasattr(self.trainedModel.data, "resAb")
or self.trainedModel.data.resAb is None):
if not isinstance(pMat, (np.ndarray,)): pMat = pMat.data
resAb = np.empty((nbs, S, nAs), dtype = np.complex)
for j in range(nAs):
- MAj = scaling ** (j + 1) * As[j].dot(pMat)
+ MAj = As[j].dot(pMat)
for i in range(nbs):
- Mbi = scaling ** (i + 1) * bs[i]
+ Mbi = bs[i]
resAb[i, :, j] = self.estimatorNormEngine.innerProduct(MAj,
Mbi)
else:
Sold = self.trainedModel.data.resAb.shape[1]
if Sold == S: return
if Sold > S:
resAb = self.trainedModel.data.resAb[:, : S, :]
else:
if not isinstance(pMat, (np.ndarray,)): pMat = pMat.data
resAb = np.empty((nbs, S, nAs), dtype = np.complex)
resAb[:, : Sold, :] = self.trainedModel.data.resAb
for j in range(nAs):
- MAj = scaling ** (j + 1) * As[j].dot(pMat[:, Sold :])
+ MAj = As[j].dot(pMat[:, Sold :])
for i in range(nbs):
- Mbi = scaling ** (i + 1) * bs[i]
+ Mbi = bs[i]
resAb[i, Sold :, j] = (
self.estimatorNormEngine.innerProduct(MAj, Mbi))
self.trainedModel.data.resAb = resAb
- def assembleReducedResidualBlocksAA(self, As:List[Np2D], pMat:sampList,
- scaling : float = 1.):
+ def assembleReducedResidualBlocksAA(self, As:List[Np2D], pMat:sampList):
"""
Build blocks (of type AA) of reduced linear system through projections.
"""
self.initEstimatorNormEngine()
nAs = len(As)
S = len(self.mus)
if (not hasattr(self.trainedModel.data, "resAA")
or self.trainedModel.data.resAA is None):
if not isinstance(pMat, (np.ndarray,)): pMat = pMat.data
resAA = np.empty((S, nAs, S, nAs), dtype = np.complex)
for i in range(nAs):
- MAi = scaling ** (i + 1) * As[i].dot(pMat)
+ MAi = As[i].dot(pMat)
resAA[:, i, :, i] = (
self.estimatorNormEngine.innerProduct(MAi, MAi))
for j in range(i):
- MAj = scaling ** (j + 1) * As[j].dot(pMat)
+ MAj = As[j].dot(pMat)
resAA[:, i, :, j] = (
self.estimatorNormEngine.innerProduct(MAj, MAi))
for i in range(nAs):
for j in range(i + 1, nAs):
resAA[:, i, :, j] = resAA[:, j, :, i].T.conj()
else:
Sold = self.trainedModel.data.resAA.shape[0]
if Sold == S: return
if Sold > S:
resAA = self.trainedModel.data.resAA[: S, :, : S, :]
else:
if not isinstance(pMat, (np.ndarray,)): pMat = pMat.data
resAA = np.empty((S, nAs, S, nAs), dtype = np.complex)
resAA[: Sold, :, : Sold, :] = self.trainedModel.data.resAA
for i in range(nAs):
- MAi = scaling ** (i + 1) * As[i].dot(pMat)
+ MAi = As[i].dot(pMat)
resAA[: Sold, i, Sold :, i] = (
self.estimatorNormEngine.innerProduct(MAi[:, Sold :],
MAi[:, : Sold]))
resAA[Sold :, i, : Sold, i] = resAA[: Sold, i,
Sold :, i].T.conj()
resAA[Sold :, i, Sold :, i] = (
self.estimatorNormEngine.innerProduct(MAi[:, Sold :],
MAi[:, Sold :]))
for j in range(i):
- MAj = scaling ** (j + 1) * As[j].dot(pMat)
+ MAj = As[j].dot(pMat)
resAA[: Sold, i, Sold :, j] = (
self.estimatorNormEngine.innerProduct(MAj[:, Sold :],
MAi[:, : Sold]))
resAA[Sold :, i, : Sold, j] = (
self.estimatorNormEngine.innerProduct(MAj[:, : Sold],
MAi[:, Sold :]))
resAA[Sold :, i, Sold :, j] = (
self.estimatorNormEngine.innerProduct(MAj[:, Sold :],
MAi[:, Sold :]))
for i in range(nAs):
for j in range(i + 1, nAs):
resAA[: Sold, i, Sold :, j] = (
resAA[Sold :, j, : Sold, i].T.conj())
resAA[Sold :, i, : Sold, j] = (
resAA[: Sold, j, Sold :, i].T.conj())
resAA[Sold :, i, Sold :, j] = (
resAA[Sold :, j, Sold :, i].T.conj())
self.trainedModel.data.resAA = resAA
+ def assembleReducedResidualBlocks(self, full : bool = False):
+ """Build affine blocks of affine decomposition of residual."""
+ self.assembleReducedResidualBlocksbb(self.bs)
+ if full:
+ pMat = self.trainedModel.data.projMat
+ self.assembleReducedResidualBlocksAb(self.As, self.bs, pMat)
+ self.assembleReducedResidualBlocksAA(self.As, pMat)
diff --git a/rrompy/reduction_methods/distributed_greedy/rational_interpolant_greedy.py b/rrompy/reduction_methods/distributed_greedy/rational_interpolant_greedy.py
index b1f06b1..661bade 100644
--- a/rrompy/reduction_methods/distributed_greedy/rational_interpolant_greedy.py
+++ b/rrompy/reduction_methods/distributed_greedy/rational_interpolant_greedy.py
@@ -1,412 +1,355 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
from copy import deepcopy as copy
import numpy as np
from .generic_distributed_greedy_approximant import \
GenericDistributedGreedyApproximant
-from rrompy.utilities.poly_fitting.polynomial import polybases
+from rrompy.utilities.poly_fitting.polynomial import (polybases, polydomcoeff,
+ PolynomialInterpolator as PI)
from rrompy.reduction_methods.distributed import RationalInterpolant
from rrompy.reduction_methods.trained_model import (
TrainedModelRational as tModel)
from rrompy.reduction_methods.trained_model import TrainedModelData
from rrompy.utilities.base.types import Np1D, Np2D, DictAny, HFEng, paramVal
from rrompy.utilities.base import verbosityManager as vbMng
from rrompy.utilities.exception_manager import RROMPyWarning
from rrompy.utilities.exception_manager import RROMPyException, RROMPyAssert
__all__ = ['RationalInterpolantGreedy']
class RationalInterpolantGreedy(GenericDistributedGreedyApproximant,
RationalInterpolant):
"""
ROM greedy rational interpolant computation for parametric problems.
Args:
HFEngine: HF problem solver.
mu0(optional): Default parameter. Defaults to 0.
approxParameters(optional): Dictionary containing values for main
parameters of approximant. Recognized keys are:
- 'POD': whether to compute POD of snapshots; defaults to True;
- 'S': number of starting training points;
- 'sampler': sample point generator;
- 'radialBasis': radial basis family for interpolant numerator;
defaults to 0, i.e. no radial basis;
- 'radialBasisWeights': radial basis weights for interpolant
numerator; defaults to 0, i.e. identity;
- 'greedyTol': uniform error tolerance for greedy algorithm;
defaults to 1e-2;
- 'interactive': whether to interactively terminate greedy
algorithm; defaults to False;
- 'maxIter': maximum number of greedy steps; defaults to 1e2;
- 'refinementRatio': ratio of training points to be exhausted
before training set refinement; defaults to 0.2;
- 'nTestPoints': number of test points; defaults to 5e2;
- 'trainSetGenerator': training sample points generator; defaults
- to Chebyshev sampler within muBounds;
+ to sampler;
- 'polybasis': type of basis for interpolation; allowed values
include 'MONOMIAL', 'CHEBYSHEV' and 'LEGENDRE'; defaults to
'MONOMIAL';
- - 'Delta': difference between M and N in rational approximant;
- defaults to 0;
- 'errorEstimatorKind': kind of error estimator; available values
include 'EXACT', 'BASIC', and 'BARE'; defaults to 'EXACT';
- 'interpRcond': tolerance for interpolation; defaults to None;
- 'robustTol': tolerance for robust rational denominator
management; defaults to 0.
Defaults to empty dict.
homogeneized(optional): Whether to homogeneize Dirichlet BCs. Defaults
to False.
verbosity(optional): Verbosity level. Defaults to 10.
Attributes:
HFEngine: HF problem solver.
mu0: Default parameter.
mus: Array of snapshot parameters.
homogeneized: Whether to homogeneize Dirichlet BCs.
approxParameters: Dictionary containing values for main parameters of
approximant. Recognized keys are in parameterList.
parameterListSoft: Recognized keys of soft approximant parameters:
- 'POD': whether to compute POD of snapshots.
- 'radialBasis': radial basis family for interpolant numerator;
defaults to 0, i.e. no radial basis;
- 'radialBasisWeights': radial basis weights for interpolant
numerator; defaults to 0, i.e. identity;
- 'greedyTol': uniform error tolerance for greedy algorithm;
- 'interactive': whether to interactively terminate greedy
algorithm;
- 'maxIter': maximum number of greedy steps;
- 'refinementRatio': ratio of test points to be exhausted before
test set refinement;
- 'nTestPoints': number of test points;
- 'trainSetGenerator': training sample points generator;
- - 'Delta': difference between M and N in rational approximant;
- 'errorEstimatorKind': kind of error estimator;
- 'interpRcond': tolerance for interpolation;
- 'robustTol': tolerance for robust rational denominator
management.
parameterListCritical: Recognized keys of critical approximant
parameters:
- 'S': total number of samples current approximant relies upon;
- 'sampler': sample point generator.
POD: whether to compute POD of snapshots.
S: number of test points.
sampler: Sample point generator.
radialBasis: Radial basis family for interpolant numerator.
radialBasisWeights: Radial basis weights for interpolant numerator.
greedyTol: uniform error tolerance for greedy algorithm.
interactive: whether to interactively terminate greedy algorithm.
maxIter: maximum number of greedy steps.
refinementRatio: ratio of training points to be exhausted before
training set refinement.
nTestPoints: number of starting training points.
trainSetGenerator: training sample points generator.
robustTol: tolerance for robust rational denominator management.
- Delta: difference between M and N in rational approximant.
errorEstimatorKind: kind of error estimator.
interpRcond: tolerance for interpolation.
robustTol: tolerance for robust rational denominator management.
muBounds: list of bounds for parameter values.
samplingEngine: Sampling engine.
estimatorNormEngine: Engine for estimator norm computation.
uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as
sampleList.
lastSolvedHF: Parameter(s) corresponding to last computed high fidelity
solution(s) as parameterList.
uApproxReduced: Reduced approximate solution(s) with parameter(s)
lastSolvedApprox as sampleList.
lastSolvedApproxReduced: Parameter(s) corresponding to last computed
reduced approximate solution(s) as parameterList.
uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as
sampleList.
lastSolvedApprox: Parameter(s) corresponding to last computed
approximate solution(s) as parameterList.
"""
_allowedEstimatorKinds = ["EXACT", "BASIC", "BARE"]
def __init__(self, HFEngine:HFEng, mu0 : paramVal = None,
approxParameters : DictAny = {}, homogeneized : bool = False,
verbosity : int = 10, timestamp : bool = True):
self._preInit()
- self._addParametersToList(["Delta", "polybasis", "errorEstimatorKind",
+ self._addParametersToList(["polybasis", "errorEstimatorKind",
"interpRcond", "robustTol"],
- [0, "MONOMIAL", "EXACT", -1, 0],
+ ["MONOMIAL", "EXACT", -1, 0],
toBeExcluded = ["E"])
super().__init__(HFEngine = HFEngine, mu0 = mu0,
approxParameters = approxParameters,
homogeneized = homogeneized,
verbosity = verbosity, timestamp = timestamp)
- vbMng(self, "INIT", "Computing Taylor blocks of system.", 7)
- nAs = self.HFEngine.nAs - 1
- nbs = max(self.HFEngine.nbs, (nAs + 1) * self.homogeneized)
- self.As = [self.HFEngine.A(self.mu0, j + 1) for j in range(nAs)]
- self.bs = [self.HFEngine.b(self.mu0, j, self.homogeneized)
- for j in range(nbs)]
- vbMng(self, "DEL", "Done computing Taylor blocks.", 7)
self._postInit()
+ @property
+ def E(self):
+ """Value of E."""
+ self._E = np.prod(self._S) - 1
+ return self._E
+ @E.setter
+ def E(self, E):
+ RROMPyWarning(("E is used just to simplify inheritance, and its value "
+ "cannot be changed from that of prod(S) - 1."))
+
@property
def polybasis(self):
"""Value of polybasis."""
return self._polybasis
@polybasis.setter
def polybasis(self, polybasis):
try:
polybasis = polybasis.upper().strip().replace(" ","")
if polybasis not in polybases:
raise RROMPyException("Sample type not recognized.")
self._polybasis = polybasis
except:
RROMPyWarning(("Prescribed polybasis not recognized. Overriding "
"to 'MONOMIAL'."))
self._polybasis = "MONOMIAL"
self._approxParameters["polybasis"] = self.polybasis
- @property
- def Delta(self):
- """Value of Delta."""
- return self._Delta
- @Delta.setter
- def Delta(self, Delta):
- if not np.isclose(Delta, np.floor(Delta)):
- raise RROMPyException("Delta must be an integer.")
- if Delta < 0:
- RROMPyWarning(("Error estimator unreliable for Delta < 0. "
- "Overloading of errorEstimator is suggested."))
- else:
- Deltamin = (max(self.HFEngine.nbs,
- self.HFEngine.nAs * self.homogeneized)
- - 1 - 1 * (self.HFEngine.nAs > 1))
- if Delta < Deltamin:
- RROMPyWarning(("Method may be unreliable for selected Delta. "
- "Suggested minimal value of Delta: {}.").format(
- Deltamin))
- self._Delta = Delta
- self._approxParameters["Delta"] = self.Delta
-
@property
def errorEstimatorKind(self):
"""Value of errorEstimatorKind."""
return self._errorEstimatorKind
@errorEstimatorKind.setter
def errorEstimatorKind(self, errorEstimatorKind):
errorEstimatorKind = errorEstimatorKind.upper()
if errorEstimatorKind not in self._allowedEstimatorKinds:
RROMPyWarning(("Error estimator kind not recognized. Overriding "
"to 'EXACT'."))
errorEstimatorKind = "EXACT"
self._errorEstimatorKind = errorEstimatorKind
self._approxParameters["errorEstimatorKind"] = self.errorEstimatorKind
@property
def nTestPoints(self):
"""Value of nTestPoints."""
return self._nTestPoints
@nTestPoints.setter
def nTestPoints(self, nTestPoints):
- if nTestPoints <= np.abs(self.Delta):
- RROMPyWarning(("nTestPoints must be at least abs(Delta) + 1. "
- "Increasing value to abs(Delta) + 1."))
- nTestPoints = np.abs(self.Delta) + 1
+ if nTestPoints <= 0:
+ RROMPyWarning("nTestPoints must be at least 1. Overriding to 1.")
+ nTestPoints = 1
if not np.isclose(nTestPoints, np.int(nTestPoints)):
raise RROMPyException("nTestPoints must be an integer.")
nTestPoints = np.int(nTestPoints)
if hasattr(self, "_nTestPoints") and self.nTestPoints is not None:
nTestPointsold = self.nTestPoints
else: nTestPointsold = -1
self._nTestPoints = nTestPoints
self._approxParameters["nTestPoints"] = self.nTestPoints
if nTestPointsold != self.nTestPoints:
self.resetSamples()
def _errorSamplingRatio(self, mus:Np1D, muRTest:Np1D,
muRTrain:Np1D) -> Np1D:
"""Scalar ratio in explicit error estimator."""
self.setupApprox()
testTile = np.tile(np.reshape(muRTest, (-1, 1)), [1, len(muRTrain)])
nodalVals = np.prod(testTile - np.reshape(muRTrain, (1, -1)), axis = 1)
denVals = self.trainedModel.getQVal(mus)
return np.abs(nodalVals / denVals)
def _RHSNorms(self, radiusb0:Np2D) -> Np1D:
"""High fidelity system RHS norms."""
self.assembleReducedResidualBlocks(full = False)
# 'ij,jk,ik->k', resbb, radiusb0, radiusb0.conj()
b0resb0 = np.sum(self.trainedModel.data.resbb.dot(radiusb0)
* radiusb0.conj(), axis = 0)
RHSnorms = np.power(np.abs(b0resb0), .5)
return RHSnorms
def _errorEstimatorBare(self) -> Np1D:
"""Bare residual-based error estimator."""
self.setupApprox()
self.assembleReducedResidualGramian(self.trainedModel.data.projMat)
pDom = self.trainedModel.data.P.domCoeff
LL = pDom.conj().dot(self.trainedModel.data.gramian.dot(pDom))
Adiag = self.As[0].diagonal()
- LL = ((self.scaleFactor[0] * np.linalg.norm(Adiag)) ** 2. * LL
- / np.size(Adiag))
- return np.power(np.abs(LL), .5)
+ AnormApprox = np.linalg.norm(Adiag) ** 2. / np.size(Adiag)
+ return np.abs(AnormApprox * LL) ** .5
def _errorEstimatorBasic(self, muTest:paramVal, ratioTest:complex) -> Np1D:
"""Basic residual-based error estimator."""
- resmu = self.HFEngine.residual(self.trainedModel.getApprox(muTest),
- muTest, self.homogeneized,
- duality = False)[0]
+ resmu = self.HFEngine.residual(self.getApprox(muTest), muTest,
+ self.homogeneized, duality = False)[0]
return np.abs(self.estimatorNormEngine.norm(resmu) / ratioTest)
def _errorEstimatorExact(self, muRTrain:Np1D, vanderBase:Np2D) -> Np1D:
"""Exact residual-based error estimator."""
self.setupApprox()
self.assembleReducedResidualBlocks(full = True)
- nAs = self.HFEngine.nAs - 1
- nbs = max(self.HFEngine.nbs - 1, nAs * self.homogeneized)
- delta = len(self.mus) - self.trainedModel.data.Q.deg[0]
- nbsEff = max(0, nbs - delta)
- momentQ = np.zeros(nbsEff, dtype = np.complex)
- momentQu = np.zeros((len(self.mus), nAs), dtype = np.complex)
- radiusbTen = np.zeros((nbsEff, nbsEff, vanderBase.shape[1]),
- dtype = np.complex)
- radiusATen = np.zeros((nAs, nAs, vanderBase.shape[1]),
- dtype = np.complex)
- if nbsEff > 0:
- momentQ[0] = self.trainedModel.data.Q.domCoeff
- radiusbTen[0, :, :] = vanderBase[: nbsEff, :]
- momentQu[:, 0] = self.trainedModel.data.P.domCoeff
- radiusATen[0, :, :] = vanderBase[: nAs, :]
+ nAsM = self.HFEngine.nAs - 1
+ nbsM = max(self.HFEngine.nbs - 1, nAsM * self.homogeneized)
+ radiusA = np.zeros((len(self.mus), nAsM, vanderBase.shape[1]),
+ dtype = np.complex)
+ radiusb = np.zeros((nbsM, vanderBase.shape[1]), dtype = np.complex)
+ domcoeff = polydomcoeff(self.trainedModel.data.Q.deg[0],
+ self.trainedModel.data.Q.polybasis)
Qvals = self.trainedModel.getQVal(self.mus)
- for k in range(1, max(nAs, nbs * (nbsEff > 0))):
+ for k in range(max(nAsM, nbsM)):
+ if k < nAsM:
+ radiusA[:, k :, :] += np.multiply.outer(Qvals * self._fitinv,
+ vanderBase[: nAsM - k, :])
+ if k < nbsM:
+ radiusb[k :, :] += (self._fitinv.dot(Qvals)
+ * vanderBase[: nbsM - k, :])
Qvals = Qvals * muRTrain
- if k > delta and k < nbs:
- momentQ[k - delta] = self._fitinv.dot(Qvals)
- radiusbTen[k - delta, k :, :] = (
- radiusbTen[0, : delta - k, :])
- if k < nAs:
- momentQu[:, k] = Qvals * self._fitinv
- radiusATen[k, k :, :] = radiusATen[0, : - k, :]
- if self.POD and nAs > 1:
- momentQu[:, 1 :] = self.samplingEngine.RPOD.dot(
- momentQu[:, 1 :])
- radiusA = np.tensordot(momentQu, radiusATen, 1)
- if nbsEff > 0:
- radiusb = np.tensordot(momentQ, radiusbTen, 1)
- # 'ij,jk,ik->k', resbb, radiusb, radiusb.conj()
- ff = np.sum(self.trainedModel.data.resbb[delta + 1 :, delta + 1 :]\
- .dot(radiusb) * radiusb.conj(), axis = 0)
- # 'ijk,jkl,il->l', resAb, radiusA, radiusb.conj()
- Lf = np.sum(np.tensordot(
- self.trainedModel.data.resAb[delta :, :, :], radiusA, 2)
- * radiusb.conj(), axis = 0)
- else:
- ff, Lf = 0., 0.
+ if self.POD:
+ radiusA = np.tensordot(self.samplingEngine.RPOD, radiusA, 1)
# 'ijkl,klt,ijt->t', resAA, radiusA, radiusA.conj()
LL = np.sum(np.tensordot(self.trainedModel.data.resAA, radiusA, 2)
* radiusA.conj(), axis = (0, 1))
- return np.power(np.abs(ff - 2. * np.real(Lf) + LL), .5)
+ # 'ijk,jkl,il->l', resAb, radiusA, radiusb.conj()
+ Lf = np.sum(np.tensordot(self.trainedModel.data.resAb[1 :, :, :],
+ radiusA, 2) * radiusb.conj(), axis = 0)
+ # 'ij,jk,ik->k', resbb, radiusb, radiusb.conj()
+ ff = np.sum(self.trainedModel.data.resbb[1 :, 1 :].dot(radiusb)
+ * radiusb.conj(), axis = 0)
+ return domcoeff * np.abs(ff - 2. * np.real(Lf) + LL) ** .5
def errorEstimator(self, mus:Np1D) -> Np1D:
"""Standard residual-based error estimator."""
self.setupApprox()
- if (np.any(np.isnan(self.trainedModel.data.P.domCoeff))
- or np.any(np.isinf(self.trainedModel.data.P.domCoeff))):
- err = np.empty(len(mus))
- err[:] = np.inf
- return err
- nAs = self.HFEngine.nAs - 1
- nbs = max(self.HFEngine.nbs - 1, nAs * self.homogeneized)
+ nAsM = self.HFEngine.nAs - 1
+ nbsM = max(self.HFEngine.nbs - 1, nAsM * self.homogeneized)
muRTest = self.centerNormalize(mus)(0)
muRTrain = self.centerNormalize(self.mus)(0)
samplingRatio = self._errorSamplingRatio(mus, muRTest, muRTrain)
- vanderBase = np.polynomial.polynomial.polyvander(muRTest,
- max(nAs, nbs)).T
- RHSnorms = self._RHSNorms(vanderBase[: nbs + 1, :])
+ if self.errorEstimatorKind == "EXACT":
+ vanSize = max(nAsM, nbsM)
+ else:
+ vanSize = nbsM
+ vanderBase = np.polynomial.polynomial.polyvander(muRTest, vanSize).T
+ RHSnorms = self._RHSNorms(vanderBase[: nbsM + 1, :])
if self.errorEstimatorKind == "BARE":
jOpt = self._errorEstimatorBare()
elif self.errorEstimatorKind == "BASIC":
idx_muTestSample = np.argmax(samplingRatio)
jOpt = self._errorEstimatorBasic(mus[idx_muTestSample],
samplingRatio[idx_muTestSample])
else: #if self.errorEstimatorKind == "EXACT":
jOpt = self._errorEstimatorExact(muRTrain, vanderBase[: -1, :])
return jOpt * samplingRatio / RHSnorms
def setupApprox(self, plotEst : bool = False):
"""
Compute rational interpolant.
SVD-based robust eigenvalue management.
"""
if self.checkComputedApprox():
return
RROMPyAssert(self._mode, message = "Cannot setup approximant.")
- vbMng(self, "INIT", "Setting up {}.". format(self.name()), 5)
+ vbMng(self, "INIT", "Setting up {}.".format(self.name()), 5)
self.computeScaleFactor()
+ if not hasattr(self, "As") or not hasattr(self, "bs"):
+ vbMng(self, "INIT", "Computing affine blocks of system.", 10)
+ self.As = self.HFEngine.affineLinearSystemA(self.mu0,
+ self.scaleFactor)[1 :]
+ self.bs = self.HFEngine.affineLinearSystemb(self.mu0,
+ self.scaleFactor,
+ self.homogeneized)
+ vbMng(self, "DEL", "Done computing affine blocks.", 10)
self.greedy(plotEst)
self._S = len(self.mus)
- self._N, self._M, self._E = [self._S - 1] * 3
- if self.Delta < 0:
- self._M += self.Delta
- else:
- self._N -= self.Delta
+ self._N, self._M = [self._S - 1] * 2
if self.trainedModel is None:
self.trainedModel = tModel()
self.trainedModel.verbosity = self.verbosity
self.trainedModel.timestamp = self.timestamp
data = TrainedModelData(self.trainedModel.name(), self.mu0,
self.samplingEngine.samples,
self.scaleFactor,
self.HFEngine.rescalingExp)
data.mus = copy(self.mus)
self.trainedModel.data = data
else:
self.trainedModel = self.trainedModel
self.trainedModel.data.projMat = copy(self.samplingEngine.samples)
self.trainedModel.data.mus = copy(self.mus)
- if min(self.M, self.N) < 0:
- vbMng(self, "MAIN", "Minimal sample size not achieved.", 5)
- Q = np.empty(tuple([max(self.N, 0) + 1] * self.npar),
- dtype = np.complex)
- P = np.empty(tuple([max(self.M, 0) + 1] * self.npar)
- + (len(self.mus),), dtype = np.complex)
- Q[:] = np.nan
- P[:] = np.nan
- self.trainedModel.data.Q = copy(Q)
- self.trainedModel.data.P = copy(P)
- self.trainedModel.data.approxParameters = copy(
- self.approxParameters)
- vbMng(self, "DEL", "Aborting computation of approximant.", 5)
- return
+ self.catchInstability = True
if self.N > 0:
- Q = self._setupDenominator()
+ Qf = self._setupDenominator()
+ Q = Qf[0]
+ if self.errorEstimatorKind == "EXACT": self._fitinv = Qf[1]
else:
- Q = np.ones(tuple([1] * self.npar), dtype = np.complex)
+ Q = PI()
+ Q.coeffs = np.ones(1, dtype = np.complex)
+ Q.npar = 1
+ Q.polybasis = self.polybasis
+ if self.errorEstimatorKind == "EXACT": self._fitinv = np.ones(1)
self.trainedModel.data.Q = copy(Q)
self.trainedModel.data.P = copy(self._setupNumerator())
self.trainedModel.data.approxParameters = copy(self.approxParameters)
vbMng(self, "DEL", "Done setting up approximant.", 5)
-
- def assembleReducedResidualBlocks(self, full : bool = False):
- """Build affine blocks of reduced linear system through projections."""
- scaling = self.trainedModel.data.scaleFactor[0]
- self.assembleReducedResidualBlocksbb(self.bs, scaling)
- if full:
- pMat = self.trainedModel.data.projMat
- self.assembleReducedResidualBlocksAb(self.As, self.bs[1 :],
- pMat, scaling)
- self.assembleReducedResidualBlocksAA(self.As, pMat, scaling)
-
diff --git a/rrompy/reduction_methods/distributed_greedy/reduced_basis_greedy.py b/rrompy/reduction_methods/distributed_greedy/reduced_basis_greedy.py
index 9fc0247..8e85392 100644
--- a/rrompy/reduction_methods/distributed_greedy/reduced_basis_greedy.py
+++ b/rrompy/reduction_methods/distributed_greedy/reduced_basis_greedy.py
@@ -1,239 +1,228 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
import numpy as np
from copy import deepcopy as copy
from .generic_distributed_greedy_approximant import \
GenericDistributedGreedyApproximant
from rrompy.reduction_methods.distributed import ReducedBasis
from rrompy.reduction_methods.trained_model import \
TrainedModelReducedBasis as tModel
from rrompy.reduction_methods.trained_model import TrainedModelData
from rrompy.utilities.base.types import Np1D, DictAny, HFEng, paramVal
from rrompy.utilities.base import verbosityManager as vbMng
-from rrompy.utilities.poly_fitting.polynomial import (
- hashIdxToDerivative as hashI)
from rrompy.utilities.exception_manager import RROMPyWarning, RROMPyAssert
from rrompy.parameter import checkParameterList
__all__ = ['ReducedBasisGreedy']
class ReducedBasisGreedy(GenericDistributedGreedyApproximant, ReducedBasis):
"""
ROM greedy RB approximant computation for parametric problems.
Args:
HFEngine: HF problem solver.
mu0(optional): Default parameter. Defaults to 0.
approxParameters(optional): Dictionary containing values for main
parameters of approximant. Recognized keys are:
- 'POD': whether to compute POD of snapshots; defaults to True;
- 'S': number of starting training points;
- 'sampler': sample point generator;
- 'greedyTol': uniform error tolerance for greedy algorithm;
defaults to 1e-2;
- 'interactive': whether to interactively terminate greedy
algorithm; defaults to False;
- 'maxIter': maximum number of greedy steps; defaults to 1e2;
- 'refinementRatio': ratio of test points to be exhausted before
test set refinement; defaults to 0.2;
- 'nTestPoints': number of test points; defaults to 5e2;
- 'trainSetGenerator': training sample points generator; defaults
- to Chebyshev sampler within muBounds.
+ to sampler.
Defaults to empty dict.
homogeneized(optional): Whether to homogeneize Dirichlet BCs. Defaults
to False.
verbosity(optional): Verbosity level. Defaults to 10.
Attributes:
HFEngine: HF problem solver.
mu0: Default parameter.
mus: Array of snapshot parameters.
homogeneized: Whether to homogeneize Dirichlet BCs.
approxParameters: Dictionary containing values for main parameters of
approximant. Recognized keys are in parameterList.
parameterListSoft: Recognized keys of soft approximant parameters:
- 'POD': whether to compute POD of snapshots.
- 'greedyTol': uniform error tolerance for greedy algorithm;
- 'interactive': whether to interactively terminate greedy
algorithm;
- 'maxIter': maximum number of greedy steps;
- 'refinementRatio': ratio of test points to be exhausted before
test set refinement;
- 'nTestPoints': number of test points;
- 'trainSetGenerator': training sample points generator.
parameterListCritical: Recognized keys of critical approximant
parameters:
- 'S': total number of samples current approximant relies upon;
- 'sampler': sample point generator.
POD: whether to compute POD of snapshots.
S: number of test points.
sampler: Sample point generator.
greedyTol: uniform error tolerance for greedy algorithm.
interactive: whether to interactively terminate greedy algorithm.
maxIter: maximum number of greedy steps.
refinementRatio: ratio of training points to be exhausted before
training set refinement.
nTestPoints: number of starting training points.
trainSetGenerator: training sample points generator.
muBounds: list of bounds for parameter values.
samplingEngine: Sampling engine.
estimatorNormEngine: Engine for estimator norm computation.
uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as
sampleList.
lastSolvedHF: Parameter(s) corresponding to last computed high fidelity
solution(s) as parameterList.
uApproxReduced: Reduced approximate solution(s) with parameter(s)
lastSolvedApprox as sampleList.
lastSolvedApproxReduced: Parameter(s) corresponding to last computed
reduced approximate solution(s) as parameterList.
uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as
sampleList.
lastSolvedApprox: Parameter(s) corresponding to last computed
approximate solution(s) as parameterList.
As: List of sparse matrices (in CSC format) representing coefficients
of linear system matrix.
bs: List of numpy vectors representing coefficients of linear system
RHS.
ARBs: List of sparse matrices (in CSC format) representing coefficients
of compressed linear system matrix.
bRBs: List of numpy vectors representing coefficients of compressed
linear system RHS.
"""
def __init__(self, HFEngine:HFEng, mu0 : paramVal = None,
approxParameters : DictAny = {}, homogeneized : bool = False,
verbosity : int = 10, timestamp : bool = True):
self._preInit()
+ self._addParametersToList([], [], toBeExcluded = ["R", "PODTolerance"])
super().__init__(HFEngine = HFEngine, mu0 = mu0,
approxParameters = approxParameters,
homogeneized = homogeneized,
verbosity = verbosity, timestamp = timestamp)
self._postInit()
@property
def R(self):
"""Value of R."""
self._R = np.prod(self._S)
return self._R
@R.setter
def R(self, R):
RROMPyWarning(("R is used just to simplify inheritance, and its value "
"cannot be changed from that of prod(S)."))
@property
def PODTolerance(self):
"""Value of PODTolerance."""
self._PODTolerance = -1
return self._PODTolerance
@PODTolerance.setter
def PODTolerance(self, PODTolerance):
RROMPyWarning(("PODTolerance is used just to simplify inheritance, "
"and its value cannot be changed from -1."))
def errorEstimator(self, mus:Np1D) -> Np1D:
"""
Standard residual-based error estimator. Unreliable for unstable
problems (inf-sup constant is missing).
"""
self.setupApprox()
self.assembleReducedResidualBlocks(full = True)
nmus = len(mus)
nAs = self.trainedModel.data.resAA.shape[1]
nbs = self.trainedModel.data.resbb.shape[0]
radiusA = np.empty((len(self.mus), nAs, nmus), dtype = np.complex)
radiusb = np.empty((nbs, nmus), dtype = np.complex)
+ mustr = mus
+ if nmus > 2:
+ mustr = "[{} ..({}).. {}]".format(mus[0], nmus - 2, mus[-1])
+ vbMng(self.trainedModel, "INIT",
+ "Computing RB solution at mu = {}.".format(mustr), 5)
verb = self.trainedModel.verbosity
self.trainedModel.verbosity = 0
- if verb >= 5:
- mustr = mus
- if nmus > 2:
- mustr = "[{} ..({}).. {}]".format(mus[0], nmus - 2, mus[-1])
- vbMng(self, "INIT",
- "Computing RB solution at mu = {}.".format(mustr), 0)
parmus = checkParameterList(mus, self.npar)[0]
uApproxRs = self.getApproxReduced(parmus)
mu0Eff = self.mu0(0, 0) ** self.HFEngine.rescalingExp[0]
for j, muPL in enumerate(parmus):
uApproxR = uApproxRs[j]
for i in range(max(nAs, nbs)):
thetai = ((muPL[0] ** self.HFEngine.rescalingExp[0]
- mu0Eff) / self.scaleFactor[0]) ** i
if i < nAs:
radiusA[:, i, j] = thetai * uApproxR
if i < nbs:
radiusb[i, j] = thetai
- vbMng(self, "DEL", "Done computing RB solution.", 5)
self.trainedModel.verbosity = verb
+ vbMng(self.trainedModel, "DEL", "Done computing RB solution.", 5)
# 'ij,jk,ik->k', resbb, radiusb, radiusb.conj()
ff = np.sum(self.trainedModel.data.resbb.dot(radiusb) * radiusb.conj(),
axis = 0)
# 'ijk,jkl,il->l', resAb, radiusA, radiusb.conj()
Lf = np.sum(np.tensordot(self.trainedModel.data.resAb, radiusA, 2)
* radiusb.conj(), axis = 0)
# 'ijkl,klt,ijt->t', resAA, radiusA, radiusA.conj()
LL = np.sum(np.tensordot(self.trainedModel.data.resAA, radiusA, 2)
* radiusA.conj(), axis = (0, 1))
return np.abs((LL - 2. * np.real(Lf) + ff) / ff) ** .5
def setupApprox(self, plotEst : bool = False):
"""Compute RB projection matrix."""
if self.checkComputedApprox():
return
RROMPyAssert(self._mode, message = "Cannot setup approximant.")
vbMng(self, "INIT", "Setting up {}.". format(self.name()), 5)
self.computeScaleFactor()
self.greedy(plotEst)
vbMng(self, "INIT", "Computing projection matrix.", 7)
pMat = self.samplingEngine.samples
vbMng(self, "INIT", "Computing affine blocks of system.", 10)
self.As = self.HFEngine.affineLinearSystemA(self.mu0, self.scaleFactor)
self.bs = self.HFEngine.affineLinearSystemb(self.mu0, self.scaleFactor,
self.homogeneized)
vbMng(self, "DEL", "Done computing affine blocks.", 10)
if self.trainedModel is None:
self.trainedModel = tModel()
self.trainedModel.verbosity = self.verbosity
self.trainedModel.timestamp = self.timestamp
data = TrainedModelData(self.trainedModel.name(), self.mu0,
pMat, self.scaleFactor,
self.HFEngine.rescalingExp)
ARBs, bRBs = self.assembleReducedSystem(pMat)
self.trainedModel.data = data
else:
self.trainedModel = self.trainedModel
pMatOld = self.trainedModel.data.projMat
Sold = pMatOld.shape[1]
idxNew = list(range(Sold, pMat.shape[1]))
ARBs, bRBs = self.assembleReducedSystem(pMat(idxNew), pMatOld)
self.trainedModel.data.projMat = copy(pMat)
self.trainedModel.data.mus = copy(self.mus)
self.trainedModel.data.ARBs = ARBs
self.trainedModel.data.bRBs = bRBs
vbMng(self, "DEL", "Done computing projection matrix.", 7)
self.trainedModel.data.approxParameters = copy(self.approxParameters)
vbMng(self, "DEL", "Done setting up approximant.", 5)
-
- def assembleReducedResidualBlocks(self, full : bool = False):
- """Build affine blocks of RB linear system through projections."""
- self.assembleReducedResidualBlocksbb(self.bs)
- if full:
- pMat = self.trainedModel.data.projMat
- self.assembleReducedResidualBlocksAb(self.As, self.bs, pMat)
- self.assembleReducedResidualBlocksAA(self.As, pMat)
-
diff --git a/rrompy/reduction_methods/pivoting/rational_interpolant_pivoted.py b/rrompy/reduction_methods/pivoting/rational_interpolant_pivoted.py
index 15a07ea..d8d277d 100644
--- a/rrompy/reduction_methods/pivoting/rational_interpolant_pivoted.py
+++ b/rrompy/reduction_methods/pivoting/rational_interpolant_pivoted.py
@@ -1,605 +1,614 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
from copy import deepcopy as copy
import numpy as np
from rrompy.reduction_methods.base import checkRobustTolerance
from .generic_pivoted_approximant import GenericPivotedApproximant
from rrompy.reduction_methods.distributed.rational_interpolant import (
RationalInterpolant as RI)
from rrompy.utilities.poly_fitting.polynomial import (polybases, polyfitname,
nextDerivativeIndices,
hashDerivativeToIdx as hashD,
hashIdxToDerivative as hashI,
homogeneizedpolyvander as hpvP,
homogeneizedToFull,
PolynomialInterpolator as PI)
from rrompy.utilities.poly_fitting.radial_basis import (rbbases,
RadialBasisInterpolator as RBI)
from rrompy.reduction_methods.trained_model import (TrainedModelPivotedData,
TrainedModelPivotedRational as tModel)
-from rrompy.utilities.base.types import (Np1D, Np2D, HFEng, DictAny, List,
- ListAny, paramVal, paramList)
+from rrompy.utilities.base.types import (Np1D, Np2D, HFEng, DictAny, Tuple,
+ List, ListAny, paramVal, paramList)
from rrompy.utilities.base import verbosityManager as vbMng, freepar as fp
from rrompy.utilities.numerical import multifactorial, customPInv
from rrompy.utilities.exception_manager import (RROMPyException, RROMPyAssert,
RROMPyWarning)
from rrompy.parameter import checkParameter
__all__ = ['RationalInterpolantPivoted']
class RationalInterpolantPivoted(GenericPivotedApproximant):
"""
ROM pivoted rational interpolant computation for parametric problems.
Args:
HFEngine: HF problem solver.
mu0(optional): Default parameter. Defaults to 0.
directionPivot(optional): Pivot components. Defaults to [0].
approxParameters(optional): Dictionary containing values for main
parameters of approximant. Recognized keys are:
- 'POD': whether to compute POD of snapshots; defaults to True;
- 'S': total number of pivot samples current approximant relies
upon;
- 'samplerPivot': pivot sample point generator;
- 'SMarginal': total number of marginal samples current approximant
relies upon;
- 'samplerMarginal': marginal sample point generator;
- 'polybasisPivot': type of polynomial basis for pivot
interpolation; allowed values include 'MONOMIAL', 'CHEBYSHEV'
and 'LEGENDRE'; defaults to 'MONOMIAL';
- 'polybasisMarginal': type of polynomial basis for marginal
interpolation; allowed values include 'MONOMIAL', 'CHEBYSHEV'
and 'LEGENDRE'; defaults to 'MONOMIAL';
- 'E': number of derivatives used to compute interpolant; defaults
to 0;
- 'M': degree of rational interpolant numerator; defaults to 0;
- 'N': degree of rational interpolant denominator; defaults to 0;
- 'MMarginal': degree of marginal interpolant; defaults to 0;
- 'radialBasisPivot': radial basis family for pivot numerator;
defaults to 0, i.e. no radial basis;
- 'radialBasisMarginal': radial basis family for marginal
interpolant; defaults to 0, i.e. no radial basis;
- 'radialBasisWeightsPivot': radial basis weights for pivot
numerator; defaults to 0, i.e. identity;
- 'radialBasisWeightsMarginal': radial basis weights for marginal
interpolant; defaults to 0, i.e. identity;
- 'interpRcondPivot': tolerance for pivot interpolation; defaults
to None;
- 'interpRcondMarginal': tolerance for marginal interpolation;
defaults to None;
- 'robustTol': tolerance for robust rational denominator
management; defaults to 0.
Defaults to empty dict.
homogeneized(optional): Whether to homogeneize Dirichlet BCs. Defaults
to False.
verbosity(optional): Verbosity level. Defaults to 10.
Attributes:
HFEngine: HF problem solver.
mu0: Default parameter.
directionPivot: Pivot components.
mus: Array of snapshot parameters.
musPivot: Array of pivot snapshot parameters.
musMarginal: Array of marginal snapshot parameters.
homogeneized: Whether to homogeneize Dirichlet BCs.
approxParameters: Dictionary containing values for main parameters of
approximant. Recognized keys are in parameterList.
parameterListSoft: Recognized keys of soft approximant parameters:
- 'POD': whether to compute POD of snapshots;
- 'polybasisPivot': type of polynomial basis for pivot
interpolation;
- 'polybasisMarginal': type of polynomial basis for marginal
interpolation;
- 'E': number of derivatives used to compute interpolant;
- 'M': degree of rational interpolant numerator;
- 'N': degree of rational interpolant denominator;
- 'MMarginal': degree of marginal interpolant;
- 'radialBasisPivot': radial basis family for pivot numerator;
- 'radialBasisMarginal': radial basis family for marginal
interpolant;
- 'radialBasisWeightsPivot': radial basis weights for pivot
numerator;
- 'radialBasisWeightsMarginal': radial basis weights for marginal
interpolant;
- 'interpRcondPivot': tolerance for pivot interpolation;
- 'interpRcondMarginal': tolerance for marginal interpolation;
- 'robustTol': tolerance for robust rational denominator
management.
parameterListCritical: Recognized keys of critical approximant
parameters:
- 'S': total number of pivot samples current approximant relies
upon;
- 'samplerPivot': pivot sample point generator;
- 'SMarginal': total number of marginal samples current approximant
relies upon;
- 'samplerMarginal': marginal sample point generator.
POD: Whether to compute POD of snapshots.
S: Total number of pivot samples current approximant relies upon.
sampler: Pivot sample point generator.
polybasisPivot: Type of polynomial basis for pivot interpolation.
polybasisMarginal: Type of polynomial basis for marginal interpolation.
M: Numerator degree of approximant.
N: Denominator degree of approximant.
MMarginal: Degree of marginal interpolant.
radialBasisPivot: Radial basis family for pivot numerator.
radialBasisMarginal: Radial basis family for marginal interpolant.
radialBasisWeightsPivot: Radial basis weights for pivot numerator.
radialBasisWeightsMarginal: Radial basis weights for marginal
interpolant.
interpRcondPivot: Tolerance for pivot interpolation.
interpRcondMarginal: Tolerance for marginal interpolation.
robustTol: Tolerance for robust rational denominator management.
E: Complete derivative depth level of S.
muBoundsPivot: list of bounds for pivot parameter values.
muBoundsMarginal: list of bounds for marginal parameter values.
samplingEngine: Sampling engine.
uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as
sampleList.
lastSolvedHF: Parameter(s) corresponding to last computed high fidelity
solution(s) as parameterList.
uApproxReduced: Reduced approximate solution(s) with parameter(s)
lastSolvedApprox as sampleList.
lastSolvedApproxReduced: Parameter(s) corresponding to last computed
reduced approximate solution(s) as parameterList.
uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as
sampleList.
lastSolvedApprox: Parameter(s) corresponding to last computed
approximate solution(s) as parameterList.
Q: Numpy 1D vector containing complex coefficients of approximant
denominator.
P: Numpy 2D vector whose columns are FE dofs of coefficients of
approximant numerator.
"""
def __init__(self, HFEngine:HFEng, mu0 : paramVal = None,
directionPivot : ListAny = [0],
approxParameters : DictAny = {}, homogeneized : bool = False,
verbosity : int = 10, timestamp : bool = True):
self._preInit()
self._addParametersToList(
["polybasisPivot", "E", "M", "N",
"radialBasisPivot", "radialBasisWeightsPivot",
"interpRcondPivot", "robustTol"],
["MONOMIAL", -1, 0, 0, 0, 1, -1, 0])
super().__init__(HFEngine = HFEngine, mu0 = mu0,
directionPivot = directionPivot,
approxParameters = approxParameters,
homogeneized = homogeneized,
verbosity = verbosity, timestamp = timestamp)
self._postInit()
@property
def polybasisPivot(self):
"""Value of polybasisPivot."""
return self._polybasisPivot
@polybasisPivot.setter
def polybasisPivot(self, polybasisPivot):
try:
polybasisPivot = polybasisPivot.upper().strip().replace(" ","")
if polybasisPivot not in polybases:
raise RROMPyException(
"Prescribed pivot polybasis not recognized.")
self._polybasisPivot = polybasisPivot
except:
RROMPyWarning(("Prescribed pivot polybasis not recognized. "
"Overriding to 'MONOMIAL'."))
self._polybasisPivot = "MONOMIAL"
self._approxParameters["polybasisPivot"] = self.polybasisPivot
@property
def radialBasisPivot(self):
"""Value of radialBasisPivot."""
return self._radialBasisPivot
@radialBasisPivot.setter
def radialBasisPivot(self, radialBasisPivot):
try:
if radialBasisPivot != 0:
radialBasisPivot = radialBasisPivot.upper().strip().replace(
" ","")
if radialBasisPivot not in rbbases:
raise RROMPyException(("Prescribed pivot radialBasis not "
"recognized."))
self._radialBasisPivot = radialBasisPivot
except:
RROMPyWarning(("Prescribed pivot radialBasis not recognized. "
"Overriding to 0."))
self._radialBasisPivot = 0
self._approxParameters["radialBasisPivot"] = self.radialBasisPivot
@property
def radialBasisWeightsPivot(self):
"""Value of radialBasisWeightsPivot."""
return self._radialBasisWeightsPivot
@radialBasisWeightsPivot.setter
def radialBasisWeightsPivot(self, radialBasisWeightsPivot):
self._radialBasisWeightsPivot = radialBasisWeightsPivot
self._approxParameters["radialBasisWeightsPivot"] = (
self.radialBasisWeightsPivot)
@property
def polybasisPivotP(self):
if self.radialBasisPivot == 0:
return self._polybasisPivot
return self._polybasisPivot + "_" + self.radialBasisPivot
@property
def interpRcondPivot(self):
"""Value of interpRcondPivot."""
return self._interpRcondPivot
@interpRcondPivot.setter
def interpRcondPivot(self, interpRcondPivot):
self._interpRcondPivot = interpRcondPivot
self._approxParameters["interpRcondPivot"] = self.interpRcondPivot
@property
def M(self):
"""Value of M. Its assignment may change S."""
return self._M
@M.setter
def M(self, M):
if M < 0: raise RROMPyException("M must be non-negative.")
self._M = M
self._approxParameters["M"] = self.M
if hasattr(self, "_E") and self.E >= 0 and self.E < self.M:
RROMPyWarning("Prescribed S is too small. Decreasing M.")
self.M = self.E
@property
def N(self):
"""Value of N. Its assignment may change S."""
return self._N
@N.setter
def N(self, N):
if N < 0: raise RROMPyException("N must be non-negative.")
self._N = N
self._approxParameters["N"] = self.N
if hasattr(self, "_E") and self.E >= 0 and self.E < self.N:
RROMPyWarning("Prescribed S is too small. Decreasing N.")
self.N = self.E
@property
def E(self):
"""Value of E."""
return self._E
@E.setter
def E(self, E):
if E < 0:
if not hasattr(self, "_S"):
raise RROMPyException(("Value of E must be positive if S is "
"not yet assigned."))
E = np.sum(hashI(np.prod(self.S), len(self.directionPivot))) - 1
self._E = E
self._approxParameters["E"] = self.E
if (hasattr(self, "_S")
and self.E >= np.sum(hashI(np.prod(self.S),len(self.directionPivot)))):
RROMPyWarning("Prescribed S is too small. Decreasing E.")
self.E = -1
if hasattr(self, "_M"): self.M = self.M
if hasattr(self, "_N"): self.N = self.N
@property
def robustTol(self):
"""Value of tolerance for robust rational denominator management."""
return self._robustTol
@robustTol.setter
def robustTol(self, robustTol):
if robustTol < 0.:
RROMPyWarning(("Overriding prescribed negative robustness "
"tolerance to 0."))
robustTol = 0.
self._robustTol = robustTol
self._approxParameters["robustTol"] = self.robustTol
@property
def S(self):
"""Value of S."""
return self._S
@S.setter
def S(self, S):
GenericPivotedApproximant.S.fset(self, S)
if hasattr(self, "_M"): self.M = self.M
if hasattr(self, "_N"): self.N = self.N
if hasattr(self, "_E"): self.E = self.E
def resetSamples(self):
"""Reset samples."""
super().resetSamples()
self._musPUniqueCN = None
self._derPIdxs = None
self._reorderP = None
def _setupPivotInterpolationIndices(self):
"""Setup parameters for polyvander."""
RROMPyAssert(self._mode,
message = "Cannot setup interpolation indices.")
if (self._musPUniqueCN is None
or len(self._reorderP) != len(self.musPivot)):
self._musPUniqueCN, musPIdxsTo, musPIdxs, musPCount = (
self.centerNormalizePivot(self.musPivot).unique(
return_index = True,
return_inverse = True,
return_counts = True))
self._musPUnique = self.mus[musPIdxsTo]
self._derPIdxs = [None] * len(self._musPUniqueCN)
self._reorderP = np.empty(len(musPIdxs), dtype = int)
filled = 0
for j, cnt in enumerate(musPCount):
self._derPIdxs[j] = nextDerivativeIndices([],
self.musPivot.shape[1], cnt)
jIdx = np.nonzero(musPIdxs == j)[0]
self._reorderP[jIdx] = np.arange(filled, filled + cnt)
filled += cnt
def _setupDenominator(self):
"""Compute rational denominator."""
RROMPyAssert(self._mode, message = "Cannot setup denominator.")
vbMng(self, "INIT", "Starting computation of denominator.", 7)
NinvD = None
N0 = copy(self.N)
qs = []
self.verbosity -= 10
for j in range(len(self.musMarginal)):
self._N = N0
while self.N > 0:
if NinvD != self.N:
- invD = self._computeInterpolantInverseBlocks()
+ invD, fitinvP = self._computeInterpolantInverseBlocks()
NinvD = self.N
if self.POD:
ev, eV = RI.findeveVGQR(self, self.samplingEngine.RPOD[j],
invD)
else:
ev, eV = RI.findeveVGExplicit(self,
self.samplingEngine.samples[j], invD)
- newParams = checkRobustTolerance(ev, self.N, self.robustTol)
- if not newParams:
- break
- self.approxParameters = newParams
+ nevBad = checkRobustTolerance(ev, self.robustTol)
+ if nevBad <= 1: break
+ if self.catchInstability:
+ raise RROMPyException(("Instability in denominator "
+ "computation: eigenproblem is "
+ "poorly conditioned."))
+ RROMPyWarning(("Smallest {} eigenvalues below tolerance. "
+ "Reducing N by 1.").format(nevBad))
+ self.N = self.N - 1
if self.N <= 0:
self._N = 0
eV = np.ones((1, 1))
q = PI()
q.npar = self.musPivot.shape[1]
q.polybasis = self.polybasisPivot
q.coeffs = homogeneizedToFull(tuple([self.N + 1] * q.npar),
q.npar, eV[:, 0])
qs = qs + [copy(q)]
self.verbosity += 10
vbMng(self, "DEL", "Done computing denominator.", 7)
- return qs
+ return qs, fitinvP
def _setupNumerator(self):
"""Compute rational numerator."""
RROMPyAssert(self._mode, message = "Cannot setup numerator.")
vbMng(self, "INIT", "Starting computation of numerator.", 7)
Qevaldiag = np.zeros((len(self.musPivot), len(self.musPivot)),
dtype = np.complex)
verb = self.trainedModel.verbosity
self.trainedModel.verbosity = 0
self._setupPivotInterpolationIndices()
M0 = copy(self.M)
tensor_idx = 0
ps = []
for k, muM in enumerate(self.musMarginal):
self._M = M0
idxGlob = 0
for j, derIdxs in enumerate(self._derPIdxs):
mujEff = [fp] * self.npar
for jj, kk in enumerate(self.directionPivot):
mujEff[kk] = self._musPUnique[j, jj]
for jj, kk in enumerate(self.directionMarginal):
mujEff[kk] = muM(0, jj)
mujEff = checkParameter(mujEff, self.npar)
nder = len(derIdxs)
idxLoc = np.arange(len(self.musPivot))[
(self._reorderP >= idxGlob)
* (self._reorderP < idxGlob + nder)]
idxGlob += nder
Qval = [0] * nder
for der in range(nder):
derIdx = hashI(der, self.musPivot.shape[1])
derIdxEff = [0] * self.npar
sclEff = [0] * self.npar
for jj, kk in enumerate(self.directionPivot):
derIdxEff[kk] = derIdx[jj]
sclEff[kk] = self.scaleFactorPivot[jj] ** -1.
Qval[der] = (self.trainedModel.getQVal(mujEff, derIdxEff,
scl = sclEff)
/ multifactorial(derIdx))
for derU, derUIdx in enumerate(derIdxs):
for derQ, derQIdx in enumerate(derIdxs):
diffIdx = [x - y for (x, y) in zip(derUIdx, derQIdx)]
if all([x >= 0 for x in diffIdx]):
diffj = hashD(diffIdx)
Qevaldiag[idxLoc[derU], idxLoc[derQ]] = Qval[diffj]
while self.M >= 0:
if self.radialBasisPivot == 0:
p = PI()
wellCond, msg = p.setupByInterpolation(
self._musPUniqueCN, Qevaldiag, self.M,
self.polybasisPivotP, self.verbosity >= 5, True,
{"derIdxs": self._derPIdxs,
"reorder": self._reorderP,
"scl": np.power(self.scaleFactorPivot, -1.)},
{"rcond": self.interpRcondPivot})
else:
p = RBI()
wellCond, msg = p.setupByInterpolation(
self._musPUniqueCN, Qevaldiag, self.M,
self.polybasisPivotP,
self.radialBasisWeightsPivot,
self.verbosity >= 5, True,
{"derIdxs": self._derPIdxs,
"reorder": self._reorderP,
"scl": np.power(self.scaleFactorPivot, -1.)},
{"rcond": self.interpRcondPivot})
vbMng(self, "MAIN", msg, 5)
if wellCond: break
+ if self.catchInstability:
+ raise RROMPyException(("Instability in numerator "
+ "computation: polyfit is "
+ "poorly conditioned."))
RROMPyWarning(("Polyfit is poorly conditioned. "
"Reducing M by 1."))
self.M -= 1
if self.M < 0:
raise RROMPyException(("Instability in computation of "
"numerator. Aborting."))
tensor_idx_new = tensor_idx + Qevaldiag.shape[1]
if self.POD:
p.postmultiplyTensorize(self.samplingEngine.RPODCoalesced.T[
tensor_idx : tensor_idx_new, :])
else:
p.pad(tensor_idx, len(self.mus) - tensor_idx_new)
tensor_idx = tensor_idx_new
ps = ps + [copy(p)]
self.trainedModel.verbosity = verb
vbMng(self, "DEL", "Done computing numerator.", 7)
return ps
def setupApprox(self):
"""
Compute rational interpolant.
SVD-based robust eigenvalue management.
"""
if self.checkComputedApprox():
return
RROMPyAssert(self._mode, message = "Cannot setup approximant.")
vbMng(self, "INIT", "Setting up {}.". format(self.name()), 5)
self.computeScaleFactor()
self.computeSnapshots()
if self.trainedModel is None:
self.trainedModel = tModel()
self.trainedModel.verbosity = self.verbosity
self.trainedModel.timestamp = self.timestamp
data = TrainedModelPivotedData(self.trainedModel.name(), self.mu0,
self.samplingEngine.samplesCoalesced,
self.scaleFactor,
self.HFEngine.rescalingExp,
self.directionPivot)
data.musPivot = copy(self.musPivot)
data.musMarginal = copy(self.musMarginal)
self.trainedModel.data = data
else:
self.trainedModel = self.trainedModel
self.trainedModel.data.projMat = copy(self.samplingEngine.samples)
self.trainedModel.data.marginalInterp = self._setupMarginalInterp()
if self.N > 0:
- Qs = self._setupDenominator()
+ Qs = self._setupDenominator()[0]
else:
Q = PI()
Q.npar = self.musPivot.shape[1]
Q.coeffs = np.ones(tuple([1] * Q.npar), dtype = np.complex)
Q.polybasis = self.polybasisPivot
Qs = [Q for _ in range(len(self.musMarginal))]
self.trainedModel.data.Qs = Qs
self.trainedModel.data.Ps = self._setupNumerator()
self.trainedModel.data.approxParameters = copy(self.approxParameters)
vbMng(self, "DEL", "Done setting up approximant.", 5)
- def _computeInterpolantInverseBlocks(self) -> List[Np2D]:
+ def _computeInterpolantInverseBlocks(self) -> Tuple[List[Np2D], Np2D]:
"""
Compute inverse factors for minimal interpolant target functional.
"""
RROMPyAssert(self._mode, message = "Cannot solve eigenvalue problem.")
self._setupPivotInterpolationIndices()
while self.E >= 0:
eWidth = (hashD([self.E + 1] + [0] * (self.musPivot.shape[1] - 1))
- hashD([self.E] + [0] * (self.musPivot.shape[1] - 1)))
TE, _, argIdxs = hpvP(self._musPUniqueCN, self.E,
self.polybasisPivot, self._derPIdxs,
self._reorderP,
scl = np.power(self.scaleFactorPivot, -1.))
fitOut = customPInv(TE[:, argIdxs], rcond = self.interpRcondPivot,
full = True)
vbMng(self, "MAIN",
("Fitting {} samples with degree {} through {}... "
"Conditioning of pseudoinverse system: {:.4e}.").format(
TE.shape[0], self.E,
polyfitname(self.polybasisPivot),
fitOut[1][1][0] / fitOut[1][1][-1]),
5)
if fitOut[1][0] == len(argIdxs):
- self._fitinvP = fitOut[0][- eWidth : , :]
+ fitinvP = fitOut[0][- eWidth : , :]
break
RROMPyWarning("Polyfit is poorly conditioned. Reducing E by 1.")
self.E -= 1
if self.E < 0:
raise RROMPyException(("Instability in computation of "
"denominator. Aborting."))
TN, _, argIdxs = hpvP(self._musPUniqueCN, self.N, self.polybasisPivot,
self._derPIdxs, self._reorderP,
scl = np.power(self.scaleFactorPivot, -1.))
TN = TN[:, argIdxs]
invD = [None] * (eWidth)
for k in range(eWidth):
- pseudoInv = np.diag(self._fitinvP[k, :])
+ pseudoInv = np.diag(fitinvP[k, :])
idxGlob = 0
for j, derIdxs in enumerate(self._derPIdxs):
nder = len(derIdxs)
idxGlob += nder
if nder > 1:
idxLoc = np.arange(len(self.musPivot))[
(self._reorderP >= idxGlob - nder)
* (self._reorderP < idxGlob)]
- invLoc = self._fitinvP[k, idxLoc]
+ invLoc = fitinvP[k, idxLoc]
pseudoInv[np.ix_(idxLoc, idxLoc)] = 0.
for diffj, diffjIdx in enumerate(derIdxs):
for derQ, derQIdx in enumerate(derIdxs):
derUIdx = [x - y for (x, y) in
zip(diffjIdx, derQIdx)]
if all([x >= 0 for x in derUIdx]):
derU = hashD(derUIdx)
pseudoInv[idxLoc[derU], idxLoc[derQ]] = (
invLoc[diffj])
invD[k] = pseudoInv.dot(TN)
- return invD
+ return invD, fitinvP
def centerNormalize(self, mu : paramList = [],
mu0 : paramVal = None) -> paramList:
"""
Compute normalized parameter to be plugged into approximant.
Args:
mu: Parameter(s) 1.
mu0: Parameter(s) 2. If None, set to self.mu0.
Returns:
Normalized parameter.
"""
return self.trainedModel.centerNormalize(mu, mu0)
def centerNormalizePivot(self, mu : paramList = [],
mu0 : paramVal = None) -> paramList:
"""
Compute normalized parameter to be plugged into approximant.
Args:
mu: Parameter(s) 1.
mu0: Parameter(s) 2. If None, set to self.mu0.
Returns:
Normalized parameter.
"""
return self.trainedModel.centerNormalizePivot(mu, mu0)
def getResidues(self, *args, **kwargs) -> Np1D:
"""
Obtain approximant residues.
Returns:
Matrix with residues as columns.
"""
return self.trainedModel.getResidues(*args, **kwargs)
diff --git a/rrompy/reduction_methods/pole_matching/rational_interpolant_pole_matching.py b/rrompy/reduction_methods/pole_matching/rational_interpolant_pole_matching.py
index 736d68b..7f8a4f7 100644
--- a/rrompy/reduction_methods/pole_matching/rational_interpolant_pole_matching.py
+++ b/rrompy/reduction_methods/pole_matching/rational_interpolant_pole_matching.py
@@ -1,222 +1,222 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
from copy import deepcopy as copy
import numpy as np
from rrompy.reduction_methods.pivoting.rational_interpolant_pivoted import \
RationalInterpolantPivoted
from .generic_pole_matching_approximant import GenericPoleMatchingApproximant
from rrompy.utilities.poly_fitting.polynomial import (
PolynomialInterpolator as PI)
from rrompy.reduction_methods.trained_model import (TrainedModelPivotedData,
TrainedModelPoleMatchingRational as tModel)
from rrompy.utilities.base import verbosityManager as vbMng
from rrompy.utilities.exception_manager import RROMPyAssert
__all__ = ['RationalInterpolantPoleMatching']
class RationalInterpolantPoleMatching(GenericPoleMatchingApproximant,
RationalInterpolantPivoted):
"""
ROM pivoted rational interpolant computation for parametric problems with
pole matching.
Args:
HFEngine: HF problem solver.
mu0(optional): Default parameter. Defaults to 0.
directionPivot(optional): Pivot components. Defaults to [0].
approxParameters(optional): Dictionary containing values for main
parameters of approximant. Recognized keys are:
- 'POD': whether to compute POD of snapshots; defaults to True;
- 'matchingWeight': weight for pole matching optimization; defaults
to 1;
- 'cutOffTolerance': tolerance for ignoring parasitic poles;
defaults to np.inf;
- 'cutOffType': rule for tolerance computation for parasitic poles;
defaults to 'MAGNITUDE';
- 'S': total number of pivot samples current approximant relies
upon;
- 'samplerPivot': pivot sample point generator;
- 'SMarginal': total number of marginal samples current approximant
relies upon;
- 'samplerMarginal': marginal sample point generator;
- 'polybasisPivot': type of polynomial basis for pivot
interpolation; allowed values include 'MONOMIAL', 'CHEBYSHEV'
and 'LEGENDRE'; defaults to 'MONOMIAL';
- 'polybasisMarginal': type of polynomial basis for marginal
interpolation; allowed values include 'MONOMIAL', 'CHEBYSHEV'
and 'LEGENDRE'; defaults to 'MONOMIAL';
- 'E': number of derivatives used to compute interpolant; defaults
to 0;
- 'M': degree of rational interpolant numerator; defaults to 0;
- 'N': degree of rational interpolant denominator; defaults to 0;
- 'MMarginal': degree of marginal interpolant; defaults to 0;
- 'radialBasisPivot': radial basis family for pivot numerator;
defaults to 0, i.e. no radial basis;
- 'radialBasisMarginal': radial basis family for marginal
interpolant; defaults to 0, i.e. no radial basis;
- 'radialBasisWeightsPivot': radial basis weights for pivot
numerator; defaults to 0, i.e. identity;
- 'radialBasisWeightsMarginal': radial basis weights for marginal
interpolant; defaults to 0, i.e. identity;
- 'interpRcondPivot': tolerance for pivot interpolation; defaults
to None;
- 'interpRcondMarginal': tolerance for marginal interpolation;
defaults to None;
- 'robustTol': tolerance for robust rational denominator
management; defaults to 0.
Defaults to empty dict.
homogeneized(optional): Whether to homogeneize Dirichlet BCs. Defaults
to False.
verbosity(optional): Verbosity level. Defaults to 10.
Attributes:
HFEngine: HF problem solver.
mu0: Default parameter.
directionPivot: Pivot components.
mus: Array of snapshot parameters.
musPivot: Array of pivot snapshot parameters.
musMarginal: Array of marginal snapshot parameters.
homogeneized: Whether to homogeneize Dirichlet BCs.
approxParameters: Dictionary containing values for main parameters of
approximant. Recognized keys are in parameterList.
parameterListSoft: Recognized keys of soft approximant parameters:
- 'POD': whether to compute POD of snapshots;
- 'matchingWeight': weight for pole matching optimization;
- 'cutOffTolerance': tolerance for ignoring parasitic poles;
- 'cutOffType': rule for tolerance computation for parasitic poles;
- 'polybasisPivot': type of polynomial basis for pivot
interpolation;
- 'polybasisMarginal': type of polynomial basis for marginal
interpolation;
- 'E': number of derivatives used to compute interpolant;
- 'M': degree of rational interpolant numerator;
- 'N': degree of rational interpolant denominator;
- 'MMarginal': degree of marginal interpolant;
- 'radialBasisPivot': radial basis family for pivot numerator;
- 'radialBasisMarginal': radial basis family for marginal
interpolant;
- 'radialBasisWeightsPivot': radial basis weights for pivot
numerator;
- 'radialBasisWeightsMarginal': radial basis weights for marginal
interpolant;
- 'interpRcondPivot': tolerance for pivot interpolation;
- 'interpRcondMarginal': tolerance for marginal interpolation;
- 'robustTol': tolerance for robust rational denominator
management.
parameterListCritical: Recognized keys of critical approximant
parameters:
- 'S': total number of pivot samples current approximant relies
upon;
- 'samplerPivot': pivot sample point generator;
- 'SMarginal': total number of marginal samples current approximant
relies upon;
- 'samplerMarginal': marginal sample point generator.
POD: Whether to compute POD of snapshots.
matchingWeight: Weight for pole matching optimization.
cutOffTolerance: Tolerance for ignoring parasitic poles.
cutOffType: Rule for tolerance computation for parasitic poles.
S: Total number of pivot samples current approximant relies upon.
sampler: Pivot sample point generator.
polybasisPivot: Type of polynomial basis for pivot interpolation.
polybasisMarginal: Type of polynomial basis for marginal interpolation.
M: Numerator degree of approximant.
N: Denominator degree of approximant.
MMarginal: Degree of marginal interpolant.
radialBasisPivot: Radial basis family for pivot numerator.
radialBasisMarginal: Radial basis family for marginal interpolant.
radialBasisWeightsPivot: Radial basis weights for pivot numerator.
radialBasisWeightsMarginal: Radial basis weights for marginal
interpolant.
interpRcondPivot: Tolerance for pivot interpolation.
interpRcondMarginal: Tolerance for marginal interpolation.
robustTol: Tolerance for robust rational denominator management.
E: Complete derivative depth level of S.
muBoundsPivot: list of bounds for pivot parameter values.
muBoundsMarginal: list of bounds for marginal parameter values.
samplingEngine: Sampling engine.
uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as
sampleList.
lastSolvedHF: Parameter(s) corresponding to last computed high fidelity
solution(s) as parameterList.
uApproxReduced: Reduced approximate solution(s) with parameter(s)
lastSolvedApprox as sampleList.
lastSolvedApproxReduced: Parameter(s) corresponding to last computed
reduced approximate solution(s) as parameterList.
uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as
sampleList.
lastSolvedApprox: Parameter(s) corresponding to last computed
approximate solution(s) as parameterList.
Q: Numpy 1D vector containing complex coefficients of approximant
denominator.
P: Numpy 2D vector whose columns are FE dofs of coefficients of
approximant numerator.
"""
def setupApprox(self):
"""
Compute rational interpolant.
SVD-based robust eigenvalue management.
"""
if self.checkComputedApprox():
return
RROMPyAssert(self._mode, message = "Cannot setup approximant.")
vbMng(self, "INIT", "Setting up {}.". format(self.name()), 5)
self.computeScaleFactor()
self.computeSnapshots()
if self.trainedModel is None:
self.trainedModel = tModel()
self.trainedModel.verbosity = self.verbosity
self.trainedModel.timestamp = self.timestamp
data = TrainedModelPivotedData(self.trainedModel.name(), self.mu0,
self.samplingEngine.samplesCoalesced,
self.scaleFactor,
self.HFEngine.rescalingExp,
self.directionPivot)
data.musPivot = copy(self.musPivot)
data.musMarginal = copy(self.musMarginal)
self.trainedModel.data = data
else:
self.trainedModel = self.trainedModel
self.trainedModel.data.projMat = copy(
self.samplingEngine.samplesCoalesced)
self.trainedModel.data.marginalInterp = self._setupMarginalInterp()
if self.N > 0:
- Qs = self._setupDenominator()
+ Qs = self._setupDenominator()[0]
else:
Q = PI()
Q.npar = self.musPivot.shape[1]
Q.coeffs = np.ones(tuple([1] * Q.npar), dtype = np.complex)
Q.polybasis = self.polybasisPivot
Qs = [Q for _ in range(len(self.musMarginal))]
self.trainedModel.data._temporary = True
self.trainedModel.data.Qs = Qs
self.trainedModel.data.Ps = self._setupNumerator()
vbMng(self, "INIT", "Matching poles.", 10)
self.trainedModel.initializeFromRational(self.HFEngine,
self.matchingWeight, self.POD)
vbMng(self, "DEL", "Done matching poles.", 10)
if not np.isinf(self.cutOffTolerance):
vbMng(self, "INIT", "Recompressing by cut-off.", 10)
msg = self.trainedModel.recompressByCutOff([-1., 1.],
self.cutOffTolerance,
self.cutOffType)
vbMng(self, "DEL", "Done recompressing." + msg, 10)
self.trainedModel.data.approxParameters = copy(self.approxParameters)
vbMng(self, "DEL", "Done setting up approximant.", 5)
diff --git a/rrompy/sampling/pivoted/sampling_engine_pivoted.py b/rrompy/sampling/pivoted/sampling_engine_pivoted.py
index d164869..2119cb4 100644
--- a/rrompy/sampling/pivoted/sampling_engine_pivoted.py
+++ b/rrompy/sampling/pivoted/sampling_engine_pivoted.py
@@ -1,128 +1,128 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
from copy import deepcopy as copy
import numpy as np
from rrompy.sampling.base.sampling_engine_base_pivoted import (
SamplingEngineBasePivoted)
from rrompy.hfengines.base import MarginalProxyEngine
from rrompy.utilities.base.types import Np1D, paramVal, paramList, sampList
from rrompy.utilities.base import verbosityManager as vbMng, freepar as fp
from rrompy.utilities.exception_manager import RROMPyException
from rrompy.utilities.poly_fitting.polynomial import nextDerivativeIndices
from rrompy.parameter import checkParameter, checkParameterList
from rrompy.sampling import sampleList
__all__ = ['SamplingEnginePivoted']
class SamplingEnginePivoted(SamplingEngineBasePivoted):
"""HERE"""
def preprocesssamples(self, idxs:Np1D, j:int) -> sampList:
if self.samples[j] is None or len(self.samples[j]) == 0: return
return self.samples[j](idxs)
def postprocessu(self, u:sampList, j:int,
overwrite : bool = False) -> Np1D:
return copy(u)
def postprocessuBulk(self, u:sampList, j:int) -> sampList:
return copy(u)
def lastSampleManagement(self, j:int):
pass
def _getSampleConcurrence(self, mu:paramVal, j:int, previous:Np1D,
homogeneized : bool = False) -> sampList:
if len(previous) >= len(self._derIdxs[j]):
self._derIdxs[j] += nextDerivativeIndices(
self._derIdxs[j], self.nPivot,
len(previous) + 1 - len(self._derIdxs[j]))
derIdx = self._derIdxs[j][len(previous)]
mu = checkParameter(mu, self.nPivot)
samplesOld = self.preprocesssamples(previous, j)
RHS = self.HFEngineMarginalized.b(mu, derIdx,
homogeneized = homogeneized)
for j, derP in enumerate(self._derIdxs[j][: len(previous)]):
diffP = [x - y for (x, y) in zip(derIdx, derP)]
if np.all([x >= 0 for x in diffP]):
RHS -= self.HFEngineMarginalized.A(mu, diffP).dot(
samplesOld[j])
return self.solveLS(mu, RHS = RHS, homogeneized = homogeneized)
def nextSample(self, mu:paramVal, j:int, overwrite : bool = False,
homogeneized : bool = False,
lastSample : bool = True) -> Np1D:
mu = checkParameter(mu, self.nPivot)
ns = self.nsamples[j]
muidxs = self.mus[j].findall(mu[0])
if len(muidxs) > 0:
u = self._getSampleConcurrence(mu, j, np.sort(muidxs),
homogeneized)
else:
u = self.solveLS(mu, homogeneized = homogeneized)
u = self.postprocessu(u, j, overwrite = overwrite)
if overwrite:
self.samples[j][ns] = u
self.mus[j][ns] = mu[0]
else:
if ns == 0:
self.samples[j] = sampleList(u)
else:
self.samples[j].append(u)
self.mus[j].append(mu)
self.nsamples[j] += 1
if lastSample: self.lastSampleManagement(j)
return u
def iterSample(self, mus:paramList, musM:paramList,
homogeneized : bool = False) -> sampList:
mus = checkParameterList(mus, self.nPivot)[0]
musM = checkParameterList(musM, self.nMarginal)[0]
vbMng(self, "INIT", "Starting sampling iterations.", 5)
n = len(mus)
m = len(musM)
if n <= 0:
raise RROMPyException("Number of samples must be positive.")
if m <= 0:
raise RROMPyException(("Number of marginal samples must be "
"positive."))
for j in range(m):
muMEff = [fp] * self.HFEngine.npar
for k, x in enumerate(self.directionMarginal):
muMEff[x] = musM(j, k)
self.HFEngineMarginalized = MarginalProxyEngine(self.HFEngine,
list(muMEff))
if self.allowRepeatedSamples:
for k in range(n):
vbMng(self, "MAIN",
"Computing sample {} / {} for marginal {} / {}."\
.format(k + 1, n, j, m), 10)
self.nextSample(mus[k], j, overwrite = (k > 0),
homogeneized = homogeneized,
lastSample = (n == k + 1))
- if k == 0:
+ if n > 1 and k == 0:
self.preallocateSamples(self.samples[j][0], mus[0],
n, j)
else:
self.samples[j] = self.postprocessuBulk(self.solveLS(mus,
homogeneized = homogeneized), j)
self.mus[j] = copy(mus)
self.nsamples[j] = n
vbMng(self, "DEL", "Finished sampling iterations.", 5)
return self.samples[j]
diff --git a/rrompy/sampling/standard/sampling_engine_standard.py b/rrompy/sampling/standard/sampling_engine_standard.py
index cc283b5..50ab0f7 100644
--- a/rrompy/sampling/standard/sampling_engine_standard.py
+++ b/rrompy/sampling/standard/sampling_engine_standard.py
@@ -1,112 +1,112 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
from copy import deepcopy as copy
import numpy as np
from rrompy.sampling.base.sampling_engine_base import SamplingEngineBase
from rrompy.utilities.base.types import Np1D, paramVal, paramList, sampList
from rrompy.utilities.base import verbosityManager as vbMng
from rrompy.utilities.exception_manager import RROMPyException
from rrompy.utilities.poly_fitting.polynomial import nextDerivativeIndices
from rrompy.parameter import checkParameter, checkParameterList
from rrompy.sampling import sampleList
__all__ = ['SamplingEngineStandard']
class SamplingEngineStandard(SamplingEngineBase):
"""HERE"""
def preprocesssamples(self, idxs:Np1D) -> sampList:
if self.samples is None or len(self.samples) == 0: return
return self.samples(idxs)
def postprocessu(self, u:sampList, overwrite : bool = False) -> Np1D:
return copy(u)
def postprocessuBulk(self, u:sampList) -> sampList:
return copy(u)
def lastSampleManagement(self):
pass
def _getSampleConcurrence(self, mu:paramVal, previous:Np1D,
homogeneized : bool = False) -> sampList:
if len(previous) >= len(self._derIdxs):
self._derIdxs += nextDerivativeIndices(self._derIdxs,
self.HFEngine.npar,
len(previous) + 1 - len(self._derIdxs))
derIdx = self._derIdxs[len(previous)]
mu = checkParameter(mu, self.HFEngine.npar)
samplesOld = self.preprocesssamples(previous)
RHS = self.HFEngine.b(mu, derIdx, homogeneized = homogeneized)
for j, derP in enumerate(self._derIdxs[: len(previous)]):
diffP = [x - y for (x, y) in zip(derIdx, derP)]
if np.all([x >= 0 for x in diffP]):
RHS -= self.HFEngine.A(mu, diffP).dot(samplesOld[j])
return self.solveLS(mu, RHS = RHS, homogeneized = homogeneized)
def nextSample(self, mu : paramVal = [], overwrite : bool = False,
homogeneized : bool = False,
lastSample : bool = True) -> Np1D:
mu = checkParameter(mu, self.HFEngine.npar)
ns = self.nsamples
muidxs = self.mus.findall(mu[0])
if len(muidxs) > 0:
u = self._getSampleConcurrence(mu, np.sort(muidxs), homogeneized)
else:
u = self.solveLS(mu, homogeneized = homogeneized)
u = self.postprocessu(u, overwrite = overwrite)
if overwrite:
self.samples[ns] = u
self.mus[ns] = mu[0]
else:
if ns == 0:
self.samples = sampleList(u)
else:
self.samples.append(u)
self.mus.append(mu)
self.nsamples += 1
if lastSample: self.lastSampleManagement()
return u
def iterSample(self, mus:paramList,
homogeneized : bool = False) -> sampList:
mus = checkParameterList(mus, self.HFEngine.npar)[0]
vbMng(self, "INIT", "Starting sampling iterations.", 5)
n = len(mus)
if n <= 0:
raise RROMPyException(("Number of samples must be positive."))
self.resetHistory()
if self.allowRepeatedSamples:
for j in range(n):
vbMng(self, "MAIN",
"Computing sample {} / {}.".format(j + 1, n), 7)
self.nextSample(mus[j], overwrite = (j > 0),
homogeneized = homogeneized,
lastSample = (n == j + 1))
- if j == 0:
+ if n > 1 and j == 0:
self.preallocateSamples(self.samples[0], mus[0], n)
else:
self.samples = self.postprocessuBulk(self.solveLS(mus,
homogeneized = homogeneized))
self.mus = copy(mus)
self.nsamples = n
vbMng(self, "DEL", "Finished sampling iterations.", 5)
return self.samples
diff --git a/rrompy/utilities/exception_manager/exception_manager.py b/rrompy/utilities/exception_manager/exception_manager.py
index 89de98e..c019850 100644
--- a/rrompy/utilities/exception_manager/exception_manager.py
+++ b/rrompy/utilities/exception_manager/exception_manager.py
@@ -1,22 +1,32 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
__all__ = ["RROMPyException"]
+def purgeVerbosityDepth():
+ from rrompy.utilities.base.verbosity_depth import verbosityDepth
+ while True:
+ try:
+ verbosityDepth("DEL", "", "", False)
+ except:
+ break
+
class RROMPyException(Exception):
- pass
+ def __init__(self, message):
+ purgeVerbosityDepth()
+ super().__init__(message)
diff --git a/rrompy/utilities/numerical/nonlinear_eigenproblem.py b/rrompy/utilities/numerical/nonlinear_eigenproblem.py
index e43ba93..73f1e2d 100644
--- a/rrompy/utilities/numerical/nonlinear_eigenproblem.py
+++ b/rrompy/utilities/numerical/nonlinear_eigenproblem.py
@@ -1,108 +1,111 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
import numpy as np
import scipy.linalg as scla
#import scipy.sparse as scsp
from rrompy.utilities.base.types import Tuple, List, Np1D, Np2D
from .custom_pinv import customPInv
__all__ = ['linearizeDense', 'eigNonlinearDense', 'eigvalsNonlinearDense']
def linearizeDense(As:List[Np2D], jSupp : int = 1) -> Tuple[Np2D, Np2D]:
N = len(As)
n = As[0].shape[0]
stiff = np.zeros(((N - 1) * n, (N - 1) * n), dtype = As[0].dtype)
mass = np.zeros(((N - 1) * n, (N - 1) * n), dtype = As[0].dtype)
- if isinstance(jSupp, str) and jSupp.upper() == "COMPANION":
- II = np.arange(n, (N - 1) * n)
- stiff = np.pad(- np.hstack(As[-2 :: -1]),
- [[0, (N - 2) * n], [0, 0]], "constant")
- stiff[II, II - n] = 1.
- mass = np.pad(As[-1], [0, (N - 2) * n], "constant")
- mass[II, II] = 1.
- else:
- for j in range(jSupp):
- for k in range(jSupp - j - 1, jSupp):
- mass[n * j : n * (j + 1), k * n : (k + 1) * n] = \
+ if N > 1:
+ if isinstance(jSupp, str) and jSupp.upper() == "COMPANION":
+ II = np.arange(n, (N - 1) * n)
+ stiff = np.pad(- np.hstack(As[-2 :: -1]),
+ [[0, (N - 2) * n], [0, 0]], "constant")
+ stiff[II, II - n] = 1.
+ mass = np.pad(As[-1], [0, (N - 2) * n], "constant")
+ mass[II, II] = 1.
+ else:
+ for j in range(jSupp):
+ for k in range(jSupp - j - 1, jSupp):
+ mass[n * j : n * (j + 1), k * n : (k + 1) * n] = \
As[N - 2 + jSupp - k - j]
- for j in range(jSupp - 1, N - 1):
- for k in range(jSupp, N - 1 + jSupp - j):
- stiff[n * j : n * (j + 1), (k - 1) * n : k * n] = \
+ for j in range(jSupp - 1, N - 1):
+ for k in range(jSupp, N - 1 + jSupp - j):
+ stiff[n * j : n * (j + 1), (k - 1) * n : k * n] = \
- As[jSupp - k + N - 2 - j]
- stiff[: n * (jSupp - 1), : n * (jSupp - 1)] = mass[: n * (jSupp - 1),
- n : n * jSupp]
- mass[n * jSupp :, n * jSupp :] = stiff[n * (jSupp - 1) : - n,
- n * jSupp :]
+ stiff[: n * (jSupp - 1), : n * (jSupp - 1)] = \
+ mass[: n * (jSupp - 1), n : n * jSupp]
+ mass[n * jSupp :, n * jSupp :] = stiff[n * (jSupp - 1) : - n,
+ n * jSupp :]
return stiff, mass
def eigNonlinearDense(As:List[Np2D], jSupp : int = 1,
return_inverse : bool = False,
**kwargs_eig) -> Tuple[Np1D, Np2D]:
stiff, mass = linearizeDense(As, jSupp)
- ev, P = scla.eig(stiff, mass, overwrite_a = True, overwrite_b = True,
- **kwargs_eig)
+ if stiff.shape[0] == 0: return stiff, stiff
+ ev, P = scla.eig(stiff, mass, overwrite_a = True, overwrite_b = True,
+ **kwargs_eig)
if not return_inverse: return ev, P
Pinv = customPInv(P)
return ev, P, Pinv
def eigvalsNonlinearDense(As:List[Np2D], jSupp : int = 1,
**kwargs_eigvals) -> Np1D:
stiff, mass = linearizeDense(As, jSupp)
+ if stiff.shape[0] == 0: return stiff
return scla.eigvals(stiff, mass, overwrite_a = True, **kwargs_eigvals)
#def linearizeSparse(As:List[Np2D], jSupp : int = 1) -> Tuple[Np2D, Np2D]:
# N = len(As)
# n = As[0].shape[0]
# if isinstance(jSupp, str) and jSupp.upper() == "COMPANION":
# II = np.arange(n, (N - 1) * n)
# III = np.arange((N - 2) * n + 1)
# IIII = np.arange(0, n ** 2, n)
# improve management of sparse As...
# Alist = - np.hstack([A.todense() for A in As[-2 :: -1]])
# stiffD = np.concatenate((Alist.flatten(), np.ones((N - 2) * n)))
# stiffP = np.concatenate(((N - 1) * IIII, (N - 1) * n ** 2 + III))
# stiffI = np.concatenate((np.tile(np.arange((N - 1) * n), n), II - n))
# massD = np.concatenate((As[-1].todense().flatten(),
# np.ones((N - 2) * n)))
# massP = np.concatenate((IIII, n ** 2 + III))
# massI = np.concatenate((np.tile(np.arange(n), n), II))
# else:
# compute stiffD, stiffP, stiffI depending on jSupp
# compute massD, massP, massI depending on jSupp
# stiff = scsp.csr_matrix((stiffD, stiffI, stiffP),
# shape = ((N - 1) * n, (N - 1) * n))
# mass = scsp.csr_matrix((massD, massI, massP),
# shape = ((N - 1) * n, (N - 1) * n))
# return stiff, mass
#
#def eigNonlinearSparse(As:List[Np2D], jSupp : int = 1,
# return_inverse : bool = False,
# **kwargs_eig) -> Tuple[Np1D, Np2D]:
# stiff, mass = linearizeSparse(As, jSupp)
# ev, P = scsp.linalg.eig(stiff, M = mass, return_eigenvectors = True,
# **kwargs_eig)
# if not return_inverse: return ev, P
# Pinv = customPInv(P)
# return ev, P, Pinv
#
#def eigvalsNonlinearSparse(As:List[Np2D], jSupp : int = 1,
# **kwargs_eigvals) -> Np1D:
# stiff, mass = linearizeSparse(As, jSupp)
# return scsp.linalg.eig(stiff, M = mass, return_eigenvectors = False,
# **kwargs_eigvals)