diff --git a/rrompy/parameter/parameter_sampling/__init__.py b/rrompy/parameter/parameter_sampling/__init__.py
index 746a0ae..12e89ee 100644
--- a/rrompy/parameter/parameter_sampling/__init__.py
+++ b/rrompy/parameter/parameter_sampling/__init__.py
@@ -1,33 +1,35 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
from .generic_sampler import GenericSampler
from .fft_sampler import FFTSampler
from .manual_sampler import ManualSampler
from .quadrature_sampler import QuadratureSampler
+from .quadrature_sampler_total import QuadratureSamplerTotal
from .random_sampler import RandomSampler
__all__ = [
'GenericSampler',
'FFTSampler',
'ManualSampler',
'QuadratureSampler',
+ 'QuadratureSamplerTotal',
'RandomSampler'
]
diff --git a/rrompy/parameter/parameter_sampling/fft_sampler.py b/rrompy/parameter/parameter_sampling/fft_sampler.py
index a41f44b..8e2eb8d 100644
--- a/rrompy/parameter/parameter_sampling/fft_sampler.py
+++ b/rrompy/parameter/parameter_sampling/fft_sampler.py
@@ -1,51 +1,50 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
import numpy as np
from .generic_sampler import GenericSampler
from rrompy.utilities.base.types import Np1D, List, paramList
from rrompy.utilities.base import lowDiscrepancy, kroneckerer
from rrompy.parameter import checkParameterList
__all__ = ['FFTSampler']
class FFTSampler(GenericSampler):
"""Generator of FFT-type sample points on scaled roots of unity."""
def generatePoints(self, n:List[int], reorder : bool = True) -> paramList:
"""Array of sample points."""
if not hasattr(n, "__len__"): n = [n]
super().generatePoints(n)
nleft, nright = 1, np.prod(n)
xmat = np.empty((nright, self.npar), dtype = np.complex)
for d in range(self.npar):
nright //= n[d]
a, b = self.lims(0, d), self.lims(1, d)
if self.scaling is not None:
a, b = self.scaling[d](a), self.scaling[d](b)
c, r = (a + b) / 2., np.abs(a - b) / 2.
xd = c + r * np.exp(1.j * np.linspace(0, 2 * np.pi, n[d] + 1)[:-1])
- if reorder:
- fejerOrdering = lowDiscrepancy(n[d])
- xd = xd[fejerOrdering]
if self.scalingInv is not None:
xd = self.scalingInv[d](xd)
xmat[:, d] = kroneckerer(xd, nleft, nright)
nleft *= n[d]
+ if reorder:
+ xmat = xmat[lowDiscrepancy(np.prod(n)), :]
x, _ = checkParameterList(xmat, self.npar)
return x
diff --git a/rrompy/parameter/parameter_sampling/quadrature_sampler.py b/rrompy/parameter/parameter_sampling/quadrature_sampler.py
index 0900816..26cfbd1 100644
--- a/rrompy/parameter/parameter_sampling/quadrature_sampler.py
+++ b/rrompy/parameter/parameter_sampling/quadrature_sampler.py
@@ -1,86 +1,87 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
import numpy as np
from .generic_sampler import GenericSampler
-from rrompy.utilities.base.types import Np1D, List, paramList
+from rrompy.utilities.base.types import List, paramList
from rrompy.utilities.exception_manager import RROMPyException
from rrompy.utilities.base import lowDiscrepancy, kroneckerer
from rrompy.parameter import checkParameterList
__all__ = ['QuadratureSampler']
class QuadratureSampler(GenericSampler):
"""Generator of quadrature sample points."""
allowedKinds = ["UNIFORM", "CHEBYSHEV", "GAUSSLEGENDRE"]
def __init__(self, lims:paramList, kind : str = "UNIFORM",
scaling : List[callable] = None,
scalingInv : List[callable] = None):
super().__init__(lims = lims, scaling = scaling,
scalingInv = scalingInv)
self.kind = kind
def __str__(self) -> str:
return "{}_{}".format(super().__str__(), self.kind)
def __repr__(self) -> str:
return self.__str__() + " at " + hex(id(self))
@property
def kind(self):
"""Value of kind."""
return self._kind
@kind.setter
def kind(self, kind):
if kind.upper() not in self.allowedKinds:
raise RROMPyException("Generator kind not recognized.")
self._kind = kind.upper()
def generatePoints(self, n:List[int], reorder : bool = True) -> paramList:
"""Array of sample points."""
if not hasattr(n, "__len__"): n = [n]
super().generatePoints(n)
nleft, nright = 1, np.prod(n)
xmat = np.empty((nright, self.npar), dtype = self.lims.dtype)
for d in range(self.npar):
nright //= n[d]
a, b = self.lims(0, d), self.lims(1, d)
if self.scaling is not None:
a, b = self.scaling[d](a), self.scaling[d](b)
c, r = (a + b) / 2., (a - b) / 2.
dAbs = 2. * np.abs(r)
if self.kind == "UNIFORM":
xd = np.linspace(a, b, n[d])
elif self.kind == "CHEBYSHEV":
nodes, _ = np.polynomial.chebyshev.chebgauss(n[d])
xd = c + r * nodes
elif self.kind == "GAUSSLEGENDRE":
nodes, _ = np.polynomial.legendre.leggauss(n[d])
xd = c + r * nodes[::-1]
- if len(xd) > 1 and reorder:
- fejerOrdering = [n[d] - 1] + lowDiscrepancy(n[d] - 1)
- xd = xd[fejerOrdering]
if self.scalingInv is not None:
xd = self.scalingInv[d](xd)
xmat[:, d] = kroneckerer(xd, nleft, nright)
nleft *= n[d]
+ nright = np.prod(n)
+ if nright > 1 and reorder:
+ fejerOrdering = [nright - 1] + list(lowDiscrepancy(nright - 1))
+ xmat = xmat[fejerOrdering, :]
x, _ = checkParameterList(xmat, self.npar)
return x
diff --git a/rrompy/parameter/parameter_sampling/quadrature_sampler_total.py b/rrompy/parameter/parameter_sampling/quadrature_sampler_total.py
new file mode 100644
index 0000000..624148d
--- /dev/null
+++ b/rrompy/parameter/parameter_sampling/quadrature_sampler_total.py
@@ -0,0 +1,45 @@
+# Copyright (C) 2018 by the RROMPy authors
+#
+# This file is part of RROMPy.
+#
+# RROMPy is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# RROMPy is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with RROMPy. If not, see .
+#
+
+from scipy.special import binom, factorial as fact
+from .quadrature_sampler import QuadratureSampler
+from rrompy.utilities.base.types import paramList
+from rrompy.utilities.base import lowDiscrepancy
+from rrompy.parameter import checkParameterList
+
+__all__ = ['QuadratureSamplerTotal']
+
+class QuadratureSamplerTotal(QuadratureSampler):
+ """
+ Generator of quadrature sample points for total degree polynomial
+ computations.
+ """
+
+ def generatePoints(self, n:int, reorder : bool = True) -> paramList:
+ """Array of sample points."""
+ if hasattr(n, "__len__"): n = n[0]
+ d = self.npar
+ n1d = int((fact(d) * n) ** (1. / d))
+ while binom(n1d + d - 1, d) > n: n1d -= 1
+ x = super().generatePoints([n1d] * d, reorder = True)[list(range(n))]
+ if not reorder:
+ fejerOrderingInv = lowDiscrepancy(n, inverse = True)
+ xmat = x.data[fejerOrderingInv, :]
+ x, _ = checkParameterList(xmat, d)
+ return x
+
diff --git a/rrompy/parameter/parameter_sampling/random_sampler.py b/rrompy/parameter/parameter_sampling/random_sampler.py
index cc68c21..a18583d 100644
--- a/rrompy/parameter/parameter_sampling/random_sampler.py
+++ b/rrompy/parameter/parameter_sampling/random_sampler.py
@@ -1,73 +1,76 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
import numpy as np
from .generic_sampler import GenericSampler
+from rrompy.utilities.base.halton import haltonGenerate
from rrompy.utilities.base.sobol import sobolGenerate
from rrompy.utilities.base.types import Np1D, List, paramList
from rrompy.utilities.exception_manager import RROMPyException
from rrompy.parameter import checkParameterList
__all__ = ['RandomSampler']
class RandomSampler(GenericSampler):
"""Generator of quadrature sample points."""
- allowedKinds = ["UNIFORM", "SOBOL"]
+ allowedKinds = ["UNIFORM", "HALTON", "SOBOL"]
def __init__(self, lims:paramList, kind : str = "UNIFORM",
scaling : List[callable] = None,
scalingInv : List[callable] = None):
super().__init__(lims = lims, scaling = scaling,
scalingInv = scalingInv)
self.kind = kind
def __str__(self) -> str:
return "{}_{}".format(super().__str__(), self.kind)
def __repr__(self) -> str:
return self.__str__() + " at " + hex(id(self))
@property
def kind(self):
"""Value of kind."""
return self._kind
@kind.setter
def kind(self, kind):
if kind.upper() not in self.allowedKinds:
raise RROMPyException("Generator kind not recognized.")
self._kind = kind.upper()
- def generatePoints(self, n:int, seed : int = 420) -> paramList:
+ def generatePoints(self, n:int, seed : int = 42) -> paramList:
"""Array of quadrature points."""
if hasattr(n, "__len__"): n = n[0]
if self.kind == "UNIFORM":
np.random.seed(seed)
xmat = np.random.uniform(size = (n, self.npar))
+ elif self.kind == "HALTON":
+ xmat = haltonGenerate(self.npar, n, seed)
else:
xmat = sobolGenerate(self.npar, n, seed)
for d in range(self.npar):
a, b = self.lims(0, d), self.lims(1, d)
if self.scaling is not None:
a, b = self.scaling[d](a), self.scaling[d](b)
xmat[:, d] = a + (b - a) * xmat[:, d]
if self.scalingInv is not None:
xmat[:, d] = self.scalingInv[d](xmat[:, d])
x, _ = checkParameterList(xmat, self.npar)
return x
-
+
diff --git a/rrompy/reduction_methods/base/generic_approximant.py b/rrompy/reduction_methods/base/generic_approximant.py
index 2571ce9..7527e3c 100644
--- a/rrompy/reduction_methods/base/generic_approximant.py
+++ b/rrompy/reduction_methods/base/generic_approximant.py
@@ -1,884 +1,890 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
from abc import abstractmethod
import numpy as np
from itertools import product as iterprod
from copy import deepcopy as copy
from os import remove as osrm
from rrompy.sampling.linear_problem import (SamplingEngineLinear,
SamplingEngineLinearPOD)
from rrompy.utilities.base.types import (Np1D, DictAny, HFEng, List, ListAny,
strLst, paramVal, paramList, sampList)
from rrompy.utilities.base import purgeDict, verbosityDepth, getNewFilename
from rrompy.utilities.exception_manager import (RROMPyException, RROMPyAssert,
RROMPy_READY, RROMPy_FRAGILE)
from rrompy.utilities.base import pickleDump, pickleLoad
from rrompy.parameter import (emptyParameterList, checkParameter,
checkParameterList)
from rrompy.sampling import sampleList, emptySampleList
__all__ = ['GenericApproximant']
def addNormFieldToClass(self, fieldName):
def objFunc(self, mu:paramList, homogeneized : bool = False) -> float:
uV = getattr(self.__class__, "get" + fieldName)(self, mu, homogeneized)
val = self.HFEngine.norm(uV)
return val
setattr(self.__class__, "norm" + fieldName, objFunc)
def addPlotFieldToClass(self, fieldName):
def objFunc(self, mu:paramList, name : str = fieldName, save : str = None,
what : strLst = 'all', saveFormat : str = "eps",
saveDPI : int = 100, show : bool = True,
homogeneized : bool = False, **figspecs):
uV = getattr(self.__class__, "get" + fieldName)(self, mu, homogeneized)
for j, u in enumerate(uV):
self.HFEngine.plot(u, name = name + str(j), save = save,
what = what, saveFormat = saveFormat,
saveDPI = saveDPI, show = show, **figspecs)
setattr(self.__class__, "plot" + fieldName, objFunc)
def addOutParaviewFieldToClass(self, fieldName):
def objFunc(self, mu:paramVal, name : str = fieldName,
filename : str = "out", time : float = 0.,
what : strLst = 'all', forceNewFile : bool = True,
folder : bool = False, filePW = None,
homogeneized : bool = False):
if not hasattr(self.HFEngine, "outParaview"):
raise RROMPyException(("High fidelity engine cannot output to "
"Paraview."))
uV = getattr(self.__class__, "get" + fieldName)(self, mu, homogeneized)
for j, u in enumerate(uV):
self.HFEngine.outParaview(u, name = name + str(j),
filename = filename, time = time,
what = what, forceNewFile = forceNewFile,
folder = folder, filePW = filePW)
setattr(self.__class__, "outParaview" + fieldName, objFunc)
def addOutParaviewTimeDomainFieldToClass(self, fieldName):
def objFunc(self, mu:paramVal, omega : float = None,
timeFinal : float = None, periodResolution : int = 20,
name : str = fieldName, filename : str = "out",
forceNewFile : bool = True, folder : bool = False,
homogeneized : bool = False):
if not hasattr(self.HFEngine, "outParaviewTimeDomain"):
raise RROMPyException(("High fidelity engine cannot output to "
"Paraview."))
uV = getattr(self.__class__, "get" + fieldName)(self, mu, homogeneized)
if omega is None: omega = np.real(mu)
for j, u in enumerate(uV):
self.HFEngine.outParaviewTimeDomain(u, omega = omega,
timeFinal = timeFinal,
periodResolution = periodResolution,
name = name + str(j),
filename = filename,
forceNewFile = forceNewFile,
folder = folder)
setattr(self.__class__, "outParaviewTimeDomain" + fieldName, objFunc)
class GenericApproximant:
"""
ABSTRACT
ROM approximant computation for parametric problems.
Args:
HFEngine: HF problem solver.
mu0(optional): Default parameter. Defaults to 0.
approxParameters(optional): Dictionary containing values for main
parameters of approximant. Recognized keys are:
- 'POD': whether to compute POD of snapshots; defaults to True;
- 'S': total number of samples current approximant relies upon.
Defaults to empty dict.
homogeneized(optional): Whether to homogeneize Dirichlet BCs. Defaults
to False.
verbosity(optional): Verbosity level. Defaults to 10.
Attributes:
HFEngine: HF problem solver.
trainedModel: Trained model evaluator.
mu0: Default parameter.
homogeneized: Whether to homogeneize Dirichlet BCs.
approxParameters: Dictionary containing values for main parameters of
approximant. Recognized keys are in parameterList{Soft,Critical}.
parameterListSoft: Recognized keys of soft approximant parameters:
- 'POD': whether to compute POD of snapshots.
parameterListCritical: Recognized keys of critical approximant
parameters:
- 'S': total number of samples current approximant relies upon.
verbosity: Verbosity level.
POD: Whether to compute POD of snapshots.
S: Number of solution snapshots over which current approximant is
based upon.
samplingEngine: Sampling engine.
uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as
sampleList.
lastSolvedHF: Parameter(s) corresponding to last computed high fidelity
solution(s) as parameterList.
uAppReduced: Reduced approximate solution(s) with parameter(s)
lastSolvedApp as sampleList.
lastSolvedAppReduced: Parameter(s) corresponding to last computed
reduced approximate solution(s) as parameterList.
uApp: Approximate solution(s) with parameter(s) lastSolvedApp as
sampleList.
lastSolvedApp: Parameter(s) corresponding to last computed approximate
solution(s) as parameterList.
"""
__all__ += [ftype + dtype for ftype, dtype in iterprod(
["norm", "plot", "outParaview", "outParaviewTimeDomain"],
["HF", "RHS", "Approx", "Res", "Err"])]
def __init__(self, HFEngine:HFEng, mu0 : paramVal = None,
approxParameters : DictAny = {}, homogeneized : bool = False,
verbosity : int = 10, timestamp : bool = True):
self._preInit()
self._mode = RROMPy_READY
self.verbosity = verbosity
self.timestamp = timestamp
if self.verbosity >= 10:
verbosityDepth("INIT", ("Initializing approximant engine of "
"type {}.").format(self.name()),
timestamp = self.timestamp)
self._HFEngine = HFEngine
self.trainedModel = None
self.lastSolvedHF = emptyParameterList()
self.uHF = emptySampleList()
self._addParametersToList(["POD"], [True], ["S"], [1])
if mu0 is None:
if hasattr(self.HFEngine, "mu0"):
self.mu0 = checkParameter(self.HFEngine.mu0)
else:
raise RROMPyException(("Center of approximation cannot be "
"inferred from HF engine. Parameter "
"required"))
else:
self.mu0 = checkParameter(mu0, self.HFEngine.npar)
self.resetSamples()
self.homogeneized = homogeneized
self.approxParameters = approxParameters
self._postInit()
### add norm{HF,RHS,Approx,Res,Err} methods
"""
Compute norm of * at arbitrary parameter.
Args:
mu: Target parameter.
homogeneized(optional): Whether to remove Dirichlet BC. Defaults to
False.
Returns:
Target norm of *.
"""
for objName in ["HF", "RHS", "Approx", "Res", "Err"]:
addNormFieldToClass(self, objName)
### add plot{HF,RHS,Approx,Res,Err} methods
"""
Do some nice plots of * at arbitrary parameter.
Args:
mu: Target parameter.
name(optional): Name to be shown as title of the plots. Defaults to
'u'.
what(optional): Which plots to do. If list, can contain 'ABS',
'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard 'ALL'.
Defaults to 'ALL'.
save(optional): Where to save plot(s). Defaults to None, i.e. no
saving.
saveFormat(optional): Format for saved plot(s). Defaults to "eps".
saveDPI(optional): DPI for saved plot(s). Defaults to 100.
show(optional): Whether to show figure. Defaults to True.
homogeneized(optional): Whether to remove Dirichlet BC. Defaults to
False.
figspecs(optional key args): Optional arguments for matplotlib
figure creation.
"""
for objName in ["HF", "RHS", "Approx", "Res", "Err"]:
addPlotFieldToClass(self, objName)
### add outParaview{HF,RHS,Approx,Res,Err} methods
"""
Output * to ParaView file.
Args:
mu: Target parameter.
name(optional): Base name to be used for data output.
filename(optional): Name of output file.
time(optional): Timestamp.
what(optional): Which plots to do. If list, can contain 'MESH',
'ABS', 'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard
'ALL'. Defaults to 'ALL'.
forceNewFile(optional): Whether to create new output file.
filePW(optional): Fenics File entity (for time series).
homogeneized(optional): Whether to remove Dirichlet BC. Defaults to
False.
"""
for objName in ["HF", "RHS", "Approx", "Res", "Err"]:
addOutParaviewFieldToClass(self, objName)
### add outParaviewTimeDomain{HF,RHS,Approx,Res,Err} methods
"""
Output * to ParaView file, converted to time domain.
Args:
mu: Target parameter.
omega(optional): frequency.
timeFinal(optional): final time of simulation.
periodResolution(optional): number of time steps per period.
name(optional): Base name to be used for data output.
filename(optional): Name of output file.
forceNewFile(optional): Whether to create new output file.
homogeneized(optional): Whether to remove Dirichlet BC. Defaults to
False.
"""
for objName in ["HF", "RHS", "Approx", "Res", "Err"]:
addOutParaviewTimeDomainFieldToClass(self, objName)
def _preInit(self):
if not hasattr(self, "depth"): self.depth = 0
else: self.depth += 1
@property
def parameterList(self):
"""Value of parameterListSoft + parameterListCritical."""
return self.parameterListSoft + self.parameterListCritical
def _addParametersToList(self, whatSoft:strLst, defaultSoft:ListAny,
whatCritical : strLst = [],
- defaultCritical : ListAny = []):
+ defaultCritical : ListAny = [],
+ toBeExcluded : strLst = []):
+ if not hasattr(self, "parameterToBeExcluded"):
+ self.parameterToBeExcluded = []
+ self.parameterToBeExcluded += toBeExcluded
if not hasattr(self, "parameterListSoft"):
self.parameterListSoft = []
if not hasattr(self, "parameterDefaultSoft"):
self.parameterDefaultSoft = {}
if not hasattr(self, "parameterListCritical"):
self.parameterListCritical = []
if not hasattr(self, "parameterDefaultCritical"):
self.parameterDefaultCritical = {}
- self.parameterListSoft += whatSoft
for j, what in enumerate(whatSoft):
- self.parameterDefaultSoft[what] = defaultSoft[j]
- self.parameterListCritical += whatCritical
+ if what not in self.parameterToBeExcluded:
+ self.parameterListSoft += [what]
+ self.parameterDefaultSoft[what] = defaultSoft[j]
for j, what in enumerate(whatCritical):
- self.parameterDefaultCritical[what] = defaultCritical[j]
+ if what not in self.parameterToBeExcluded:
+ self.parameterListCritical += [what]
+ self.parameterDefaultCritical[what] = defaultCritical[j]
def _postInit(self):
if self.depth == 0:
if self.verbosity >= 10:
verbosityDepth("DEL", "Done initializing.",
timestamp = self.timestamp)
del self.depth
else: self.depth -= 1
def name(self) -> str:
return self.__class__.__name__
def __str__(self) -> str:
return self.name()
def __repr__(self) -> str:
return self.__str__() + " at " + hex(id(self))
def setupSampling(self):
"""Setup sampling engine."""
RROMPyAssert(self._mode, message = "Cannot setup sampling engine.")
if not hasattr(self, "_POD") or self._POD is None: return
if self.POD:
SamplingEngine = SamplingEngineLinearPOD
else:
SamplingEngine = SamplingEngineLinear
self.samplingEngine = SamplingEngine(self.HFEngine,
verbosity = self.verbosity)
@property
def HFEngine(self):
"""Value of HFEngine."""
return self._HFEngine
@HFEngine.setter
def HFEngine(self, HFEngine):
raise RROMPyException("Cannot change HFEngine.")
@property
def mu0(self):
"""Value of mu0."""
return self._mu0
@mu0.setter
def mu0(self, mu0):
mu0 = checkParameter(mu0)
if not hasattr(self, "_mu0") or mu0 != self.mu0:
self.resetSamples()
self._mu0 = mu0
@property
def npar(self):
"""Number of parameters."""
return self.mu0.shape[1]
@property
def approxParameters(self):
"""Value of approximant parameters."""
return self._approxParameters
@approxParameters.setter
def approxParameters(self, approxParams):
if not hasattr(self, "approxParameters"):
self._approxParameters = {}
approxParameters = purgeDict(approxParams, self.parameterList,
dictname = self.name() + ".approxParameters",
baselevel = 1)
keyList = list(approxParameters.keys())
fragile = False
for key in self.parameterListCritical:
if key in keyList:
val = approxParameters[key]
else:
val = getattr(self, "_" + key, None)
if val is None:
val = self.parameterDefaultCritical[key]
getattr(self.__class__, key, None).fset(self, val)
fragile = fragile or val is None
for key in self.parameterListSoft:
if key in keyList:
val = approxParameters[key]
else:
val = getattr(self, "_" + key, None)
if val is None:
val = self.parameterDefaultSoft[key]
getattr(self.__class__, key, None).fset(self, val)
if fragile:
self._mode = RROMPy_FRAGILE
@property
def POD(self):
"""Value of POD."""
return self._POD
@POD.setter
def POD(self, POD):
if hasattr(self, "_POD"): PODold = self.POD
else: PODold = -1
self._POD = POD
self._approxParameters["POD"] = self.POD
if PODold != self.POD:
self.samplingEngine = None
self.resetSamples()
@property
def S(self):
"""Value of S."""
return self._S
@S.setter
def S(self, S):
if not hasattr(S, "__len__"): S = [S]
if any([s <= 0 for s in S]):
raise RROMPyException("S must be positive.")
if hasattr(self, "_S") and self._S is not None: Sold = tuple(self.S)
else: Sold = -1
self._S = S
self._approxParameters["S"] = self.S
if Sold != tuple(self.S):
self.resetSamples()
@property
def homogeneized(self):
"""Value of homogeneized."""
return self._homogeneized
@homogeneized.setter
def homogeneized(self, homogeneized):
if not hasattr(self, "_homogeneized"):
self._homogeneized = None
if homogeneized != self.homogeneized:
self._homogeneized = homogeneized
self.resetSamples()
@property
def trainedModel(self):
"""Value of trainedModel."""
return self._trainedModel
@trainedModel.setter
def trainedModel(self, trainedModel):
self._trainedModel = trainedModel
self.lastSolvedAppReduced = emptyParameterList()
self.lastSolvedApp = emptyParameterList()
self.uAppReduced = emptySampleList()
self.uApp = emptySampleList()
def resetSamples(self):
if hasattr(self, "samplingEngine") and self.samplingEngine is not None:
self.samplingEngine.resetHistory()
else:
self.setupSampling()
self._mode = RROMPy_READY
def plotSamples(self, name : str = "u", save : str = None,
what : strLst = 'all', saveFormat : str = "eps",
saveDPI : int = 100, **figspecs):
"""
Do some nice plots of the samples.
Args:
name(optional): Name to be shown as title of the plots. Defaults to
'u'.
what(optional): Which plots to do. If list, can contain 'ABS',
'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard 'ALL'.
Defaults to 'ALL'.
save(optional): Where to save plot(s). Defaults to None, i.e. no
saving.
saveFormat(optional): Format for saved plot(s). Defaults to "eps".
saveDPI(optional): DPI for saved plot(s). Defaults to 100.
figspecs(optional key args): Optional arguments for matplotlib
figure creation.
"""
RROMPyAssert(self._mode, message = "Cannot plot samples.")
self.samplingEngine.plotSamples(name = name, save = save, what = what,
saveFormat = saveFormat,
saveDPI = saveDPI,
**figspecs)
def outParaviewSamples(self, name : str = "u", filename : str = "out",
times : Np1D = None, what : strLst = 'all',
forceNewFile : bool = True, folders : bool = False,
filePW = None):
"""
Output samples to ParaView file.
Args:
name(optional): Base name to be used for data output.
filename(optional): Name of output file.
times(optional): Timestamps.
what(optional): Which plots to do. If list, can contain 'MESH',
'ABS', 'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard
'ALL'. Defaults to 'ALL'.
forceNewFile(optional): Whether to create new output file.
folders(optional): Whether to split output in folders.
filePW(optional): Fenics File entity (for time series).
"""
RROMPyAssert(self._mode, message = "Cannot output samples.")
self.samplingEngine.outParaviewSamples(name = name,
filename = filename,
times = times, what = what,
forceNewFile = forceNewFile,
folders = folders,
filePW = filePW)
def outParaviewTimeDomainSamples(self, omegas : Np1D = None,
timeFinal : Np1D = None,
periodResolution : int = 20,
name : str = "u",
filename : str = "out",
forceNewFile : bool = True,
folders : bool = False):
"""
Output samples to ParaView file, converted to time domain.
Args:
omegas(optional): frequencies.
timeFinal(optional): final time of simulation.
periodResolution(optional): number of time steps per period.
name(optional): Base name to be used for data output.
filename(optional): Name of output file.
forceNewFile(optional): Whether to create new output file.
folders(optional): Whether to split output in folders.
"""
RROMPyAssert(self._mode, message = "Cannot output samples.")
self.samplingEngine.outParaviewTimeDomainSamples(omegas = omegas,
timeFinal = timeFinal,
periodResolution = periodResolution,
name = name, filename = filename,
forceNewFile = forceNewFile,
folders = folders)
def setTrainedModel(self, model):
"""Deepcopy approximation from trained model."""
if hasattr(model, "storeTrainedModel"):
verb = model.verbosity
model.verbosity = 0
fileOut = model.storeTrainedModel()
model.verbosity = verb
else:
try:
fileOut = getNewFilename("trained_model", "pkl")
pickleDump(model.data.__dict__, fileOut)
except:
raise RROMPyException(("Failed to store model data. Parameter "
"model must have either "
"storeTrainedModel or "
"data.__dict__ properties."))
self.loadTrainedModel(fileOut)
osrm(fileOut)
@abstractmethod
def setupApprox(self):
"""
Setup approximant. (ABSTRACT)
Any specialization should include something like
if self.checkComputedApprox():
return
RROMPyAssert(self._mode, message = "Cannot setup approximant.")
...
self.trainedModel = ...
self.trainedModel.data = ...
self.trainedModel.data.approxParameters = copy(
self.approxParameters)
"""
pass
def checkComputedApprox(self) -> bool:
"""
Check if setup of new approximant is not needed.
Returns:
True if new setup is not needed. False otherwise.
"""
return self._mode == RROMPy_FRAGILE or (self.trainedModel is not None
and self.trainedModel.data.approxParameters == self.approxParameters)
def setHF(self, muHF:paramList, uHF:sampleList,
append : bool = False) -> List[int]:
"""Assign high fidelity solution."""
newSolvedHF, _ = checkParameterList(muHF, self.npar)
newuHF = sampleList(uHF)
if append:
self.lastSolvedHF.append(newSolvedHF)
self.uHF.append(newuHF)
return list(range(len(self.uHF) - len(newuHF), len(self.uHF)))
self.lastSolvedHF, _ = checkParameterList(newSolvedHF, self.npar)
self.uHF = sampleList(newuHF)
return list(range(len(self.uHF)))
def evalHF(self, mu:paramList, append : bool = False,
prune : bool = True) -> List[int]:
"""
Find high fidelity solution with original parameters and arbitrary
parameter.
Args:
mu: Target parameter.
append(optional): Whether to append new HF solutions to old ones.
prune(optional): Whether to remove duplicates of already appearing
HF solutions.
"""
mu, _ = checkParameterList(mu, self.npar)
idx = np.empty(len(mu), dtype = np.int)
if prune:
jExtra = np.zeros(len(mu), dtype = bool)
muKeep = emptyParameterList()
muExtra = copy(muKeep)
for j in range(len(mu)):
jPos = self.lastSolvedHF.find(mu[j])
if jPos is not None:
idx[j] = jPos
muKeep.append(mu[j])
else:
jExtra[j] = True
muExtra.append(mu[j])
if len(muKeep) > 0 and not append:
idx[~jExtra] = self.setHF(muKeep, self.uHF[idx[~jExtra]],
append)
append = True
else:
jExtra = np.ones(len(mu), dtype = bool)
muExtra = mu
if len(muExtra) > 0:
newuHFs = self.samplingEngine.solveLS(muExtra,
homogeneized = self.homogeneized)
idx[jExtra] = self.setHF(muExtra, newuHFs, append)
return list(idx)
def setApproxReduced(self, muApp:paramList, uApp:sampleList,
append : bool = False) -> List[int]:
"""Assign high fidelity solution."""
newSolvedApp, _ = checkParameterList(muApp, self.npar)
newuApp = sampleList(uApp)
if append:
self.lastSolvedAppReduced.append(newSolvedApp)
self.uAppReduced.append(newuApp)
return list(range(len(self.uAppReduced) - len(newuApp),
len(self.uAppReduced)))
self.lastSolvedAppReduced, _ = checkParameterList(newSolvedApp,
self.npar)
self.uAppReduced = sampleList(newuApp)
return list(range(len(self.uAppReduced)))
def evalApproxReduced(self, mu:paramList, append : bool = False,
prune : bool = True) -> List[int]:
"""
Evaluate reduced representation of approximant at arbitrary parameter.
Args:
mu: Target parameter.
"""
self.setupApprox()
mu, _ = checkParameterList(mu, self.npar)
idx = np.empty(len(mu), dtype = np.int)
if prune:
jExtra = np.zeros(len(mu), dtype = bool)
muKeep = emptyParameterList()
muExtra = copy(muKeep)
for j in range(len(mu)):
jPos = self.lastSolvedAppReduced.find(mu[j])
if jPos is not None:
idx[j] = jPos
muKeep.append(mu[j])
else:
jExtra[j] = True
muExtra.append(mu[j])
if len(muKeep) > 0 and not append:
idx[~jExtra] = self.setApproxReduced(muKeep,
self.uAppReduced[idx[~jExtra]],
append)
append = True
else:
jExtra = np.ones(len(mu), dtype = bool)
muExtra = mu
if len(muExtra) > 0:
newuApps = self.trainedModel.getApproxReduced(muExtra)
idx[jExtra] = self.setApproxReduced(muExtra, newuApps, append)
return list(idx)
def setApprox(self, muApp:paramList, uApp:sampleList,
append : bool = False) -> List[int]:
"""Assign high fidelity solution."""
newSolvedApp, _ = checkParameterList(muApp, self.npar)
newuApp = sampleList(uApp)
if append:
self.lastSolvedApp.append(newSolvedApp)
self.uApp.append(newuApp)
return list(range(len(self.uApp) - len(newuApp), len(self.uApp)))
self.lastSolvedApp, _ = checkParameterList(newSolvedApp, self.npar)
self.uApp = sampleList(newuApp)
return list(range(len(self.uApp)))
def evalApprox(self, mu:paramList, append : bool = False,
prune : bool = True) -> List[int]:
"""
Evaluate approximant at arbitrary parameter.
Args:
mu: Target parameter.
"""
self.setupApprox()
mu, _ = checkParameterList(mu, self.npar)
idx = np.empty(len(mu), dtype = np.int)
if prune:
jExtra = np.zeros(len(mu), dtype = bool)
muKeep = emptyParameterList()
muExtra = copy(muKeep)
for j in range(len(mu)):
jPos = self.lastSolvedApp.find(mu[j])
if jPos is not None:
idx[j] = jPos
muKeep.append(mu[j])
else:
jExtra[j] = True
muExtra.append(mu[j])
if len(muKeep) > 0 and not append:
idx[~jExtra] = self.setApprox(muKeep, self.uApp[idx[~jExtra]],
append)
append = True
else:
jExtra = np.ones(len(mu), dtype = bool)
muExtra = mu
if len(muExtra) > 0:
newuApps = self.trainedModel.getApprox(muExtra)
idx[jExtra] = self.setApprox(muExtra, newuApps, append)
return list(idx)
def getHF(self, mu:paramList, homogeneized : bool = False,
append : bool = False, prune : bool = True) -> sampList:
"""
Get HF solution at arbitrary parameter.
Args:
mu: Target parameter.
homogeneized(optional): Whether to remove Dirichlet BC. Defaults to
False.
Returns:
HFsolution.
"""
mu, _ = checkParameterList(mu, self.npar)
idx = self.evalHF(mu, append = append, prune = prune)
uHFs = self.uHF(idx)
if self.homogeneized and not homogeneized:
for j, m in enumerate(mu):
uHFs[j] += self.HFEngine.liftDirichletData(m)
if not self.homogeneized and homogeneized:
for j, m in enumerate(mu):
uHFs[j] -= self.HFEngine.liftDirichletData(m)
return uHFs
def getRHS(self, mu:paramList, homogeneized : bool = False) -> sampList:
"""
Get linear system RHS at arbitrary parameter.
Args:
mu: Target parameter.
homogeneized(optional): Whether to remove Dirichlet BC. Defaults to
False.
Returns:
Linear system RHS.
"""
return self.HFEngine.residual(None, mu, homogeneized = homogeneized)
def getApproxReduced(self, mu:paramList, append : bool = False,
prune : bool = True) -> sampList:
"""
Get approximant at arbitrary parameter.
Args:
mu: Target parameter.
Returns:
Reduced approximant.
"""
mu, _ = checkParameterList(mu, self.npar)
idx = self.evalApproxReduced(mu, append = append, prune = prune)
uAppRs = self.uAppReduced(idx)
return uAppRs
def getApprox(self, mu:paramList, homogeneized : bool = False,
append : bool = False, prune : bool = True) -> sampList:
"""
Get approximant at arbitrary parameter.
Args:
mu: Target parameter.
homogeneized(optional): Whether to remove Dirichlet BC. Defaults to
False.
Returns:
Approximant.
"""
mu, _ = checkParameterList(mu, self.npar)
idx = self.evalApprox(mu, append = append, prune = prune)
uApps = self.uApp(idx)
if self.homogeneized and not homogeneized:
for j, m in enumerate(mu):
uApps[j] += self.HFEngine.liftDirichletData(m)
if not self.homogeneized and homogeneized:
for j, m in enumerate(mu):
uApps[j] -= self.HFEngine.liftDirichletData(m)
return uApps
def getRes(self, mu:paramList, homogeneized : bool = False) -> sampList:
"""
Get residual at arbitrary parameter.
Args:
mu: Target parameter.
homogeneized(optional): Whether to remove Dirichlet BC. Defaults to
False.
Returns:
Approximant residual.
"""
return self.HFEngine.residual(self.getApprox(mu, homogeneized), mu,
homogeneized = homogeneized)
def getErr(self, mu:paramList, homogeneized : bool = False) -> sampList:
"""
Get error at arbitrary parameter.
Args:
mu: Target parameter.
homogeneized(optional): Whether to remove Dirichlet BC. Defaults to
False.
Returns:
Approximant error.
"""
return self.getApprox(mu, homogeneized) - self.getHF(mu, homogeneized)
def getPoles(self) -> Np1D:
"""
Obtain approximant poles.
Returns:
Numpy complex vector of poles.
"""
self.setupApprox()
if self.verbosity >= 20:
verbosityDepth("INIT", "Computing poles of model.",
timestamp = self.timestamp)
poles = self.trainedModel.getPoles()
if self.verbosity >= 20:
verbosityDepth("DEL", "Done computing poles.",
timestamp = self.timestamp)
return poles
def storeTrainedModel(self, filenameBase : str = "trained_model",
forceNewFile : bool = True) -> str:
"""Store trained reduced model to file."""
self.setupApprox()
if self.verbosity >= 20:
verbosityDepth("INIT", "Storing trained model to file.",
timestamp = self.timestamp)
if forceNewFile:
filename = getNewFilename(filenameBase, "pkl")
else:
filename = "{}.pkl".format(filenameBase)
pickleDump(self.trainedModel.data.__dict__, filename)
if self.verbosity >= 20:
verbosityDepth("DEL", "Done storing trained model.",
timestamp = self.timestamp)
return filename
def loadTrainedModel(self, filename:str):
"""Load trained reduced model from file."""
if self.verbosity >= 20:
verbosityDepth("INIT", "Loading pre-trained model from file.",
timestamp = self.timestamp)
datadict = pickleLoad(filename)
name = datadict.pop("name")
if name == "TrainedModelPade":
from rrompy.reduction_methods.trained_model import \
TrainedModelPade as tModel
elif name == "TrainedModelRB":
from rrompy.reduction_methods.trained_model import \
TrainedModelRB as tModel
else:
raise RROMPyException(("Trained model name not recognized. "
"Loading failed."))
self.mu0 = datadict.pop("mu0")
from rrompy.reduction_methods.trained_model import TrainedModelData
trainedModel = tModel()
trainedModel.verbosity = self.verbosity
trainedModel.timestamp = self.timestamp
data = TrainedModelData(name, self.mu0, datadict.pop("projMat"),
datadict.pop("rescalingExp"))
if "mus" in datadict:
data.mus = datadict.pop("mus")
approxParameters = datadict.pop("approxParameters")
data.approxParameters = copy(approxParameters)
if "sampler" in approxParameters:
self._approxParameters["sampler"] = approxParameters.pop("sampler")
self.approxParameters = copy(approxParameters)
if "mus" in data.__dict__:
self.mus = copy(data.mus)
if name == "TrainedModelPade":
self.scaleFactor = datadict.pop("scaleFactor")
data.scaleFactor = self.scaleFactor
for key in datadict:
setattr(data, key, datadict[key])
trainedModel.data = data
self.trainedModel = trainedModel
self._mode = RROMPy_FRAGILE
if self.verbosity >= 20:
verbosityDepth("DEL", "Done loading pre-trained model.",
timestamp = self.timestamp)
diff --git a/rrompy/reduction_methods/centered/rational_pade.py b/rrompy/reduction_methods/centered/rational_pade.py
index cc086a2..20ea707 100644
--- a/rrompy/reduction_methods/centered/rational_pade.py
+++ b/rrompy/reduction_methods/centered/rational_pade.py
@@ -1,414 +1,434 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
from copy import deepcopy as copy
import numpy as np
from rrompy.reduction_methods.base import checkRobustTolerance
from rrompy.reduction_methods.trained_model import (TrainedModelData,
TrainedModelPade as tModel)
from .generic_centered_approximant import GenericCenteredApproximant
from rrompy.utilities.base.types import (Np1D, Np2D, Tuple, DictAny, HFEng,
paramVal, paramList, sampList)
from rrompy.utilities.base import verbosityDepth
from rrompy.utilities.poly_fitting.polynomial import (nextDerivativeIndices,
hashDerivativeToIdx as hashD, hashIdxToDerivative as hashI)
from rrompy.utilities.exception_manager import (RROMPyException, RROMPyAssert,
RROMPyWarning)
__all__ = ['RationalPade']
class RationalPade(GenericCenteredApproximant):
"""
ROM single-point fast Pade' approximant computation for parametric
problems.
Args:
HFEngine: HF problem solver.
mu0(optional): Default parameter. Defaults to 0.
approxParameters(optional): Dictionary containing values for main
parameters of approximant. Recognized keys are:
- 'POD': whether to compute POD of snapshots; defaults to True;
- 'S': total number of samples current approximant relies upon;
- 'M': degree of Pade' approximant numerator; defaults to 0;
- 'N': degree of Pade' approximant denominator; defaults to 0;
- 'robustTol': tolerance for robust Pade' denominator management;
defaults to 0.
Defaults to empty dict.
homogeneized(optional): Whether to homogeneize Dirichlet BCs. Defaults
to False.
verbosity(optional): Verbosity level. Defaults to 10.
Attributes:
HFEngine: HF problem solver.
mu0: Default parameter.
homogeneized: Whether to homogeneize Dirichlet BCs.
approxParameters: Dictionary containing values for main parameters of
approximant. Recognized keys are in parameterList.
parameterListSoft: Recognized keys of soft approximant parameters:
- 'POD': whether to compute POD of snapshots;
- 'M': degree of Pade' approximant numerator;
- 'N': degree of Pade' approximant denominator;
- 'robustTol': tolerance for robust Pade' denominator management.
parameterListCritical: Recognized keys of critical approximant
parameters:
- 'S': total number of samples current approximant relies upon.
POD: Whether to compute QR factorization of derivatives.
S: Number of solution snapshots over which current approximant is
based upon.
M: Numerator degree of approximant.
N: Denominator degree of approximant.
robustTol: Tolerance for robust Pade' denominator management.
E: Complete derivative depth level of S.
uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as
sampleList.
lastSolvedHF: Parameter(s) corresponding to last computed high fidelity
solution(s) as parameterList.
uAppReduced: Reduced approximate solution(s) with parameter(s)
lastSolvedApp as sampleList.
lastSolvedAppReduced: Parameter(s) corresponding to last computed
reduced approximate solution(s) as parameterList.
uApp: Approximate solution(s) with parameter(s) lastSolvedApp as
sampleList.
lastSolvedApp: Parameter(s) corresponding to last computed approximate
solution(s) as parameterList.
G: Square Numpy 2D vector of size (N+1) corresponding to Pade'
denominator matrix (see paper).
"""
def __init__(self, HFEngine:HFEng, mu0 : paramVal = None,
approxParameters : DictAny = {}, homogeneized : bool = False,
verbosity : int = 10, timestamp : bool = True):
self._preInit()
- self._addParametersToList(["M", "N", "robustTol"], [0, 0, 0])
+ self._addParametersToList(["M", "N", "E", "robustTol"], [0, 0, -1, 0])
super().__init__(HFEngine = HFEngine, mu0 = mu0,
approxParameters = approxParameters,
homogeneized = homogeneized,
verbosity = verbosity, timestamp = timestamp)
self._postInit()
@property
def M(self):
"""Value of M.."""
return self._M
@M.setter
def M(self, M):
if M < 0: raise RROMPyException("M must be non-negative.")
self._M = M
self._approxParameters["M"] = self.M
- if hasattr(self, "E") and self.E < self.M:
- RROMPyWarning("Prescribed S is too small. Decreasing M.")
+ if hasattr(self, "_E") and self.E < self.M:
+ RROMPyWarning("Prescribed E is too small. Decreasing M.")
self.M = self.E
@property
def N(self):
"""Value of N."""
return self._N
@N.setter
def N(self, N):
if N < 0: raise RROMPyException("N must be non-negative.")
self._N = N
self._approxParameters["N"] = self.N
- if hasattr(self, "E") and self.E < self.N:
- RROMPyWarning("Prescribed S is too small. Decreasing N.")
+ if hasattr(self, "_E") and self.E < self.N:
+ RROMPyWarning("Prescribed E is too small. Decreasing N.")
self.N = self.E
+ @property
+ def E(self):
+ """Value of E."""
+ return self._E
+ @E.setter
+ def E(self, E):
+ if E < 0:
+ if not hasattr(self, "_S"):
+ raise RROMPyException(("Value of E must be positive if S is "
+ "not yet assigned."))
+ E = np.sum(hashI(np.prod(self.S), self.npar)) - 1
+ self._E = E
+ self._approxParameters["E"] = self.E
+ if (hasattr(self, "_S")
+ and self.E >= np.sum(hashI(np.prod(self.S), self.npar))):
+ RROMPyWarning("Prescribed S is too small. Decreasing E.")
+ self.E = -1
+ if hasattr(self, "_M"): self.M = self.M
+ if hasattr(self, "_N"): self.N = self.N
+
@property
def robustTol(self):
"""Value of tolerance for robust Pade' denominator management."""
return self._robustTol
@robustTol.setter
def robustTol(self, robustTol):
if robustTol < 0.:
RROMPyWarning(("Overriding prescribed negative robustness "
"tolerance to 0."))
robustTol = 0.
self._robustTol = robustTol
self._approxParameters["robustTol"] = self.robustTol
@property
def S(self):
"""Value of S."""
return self._S
@S.setter
def S(self, S):
GenericCenteredApproximant.S.fset(self, S)
- self.E = np.sum(hashI(np.prod(self.S), self.npar)) - 1
if hasattr(self, "_M"): self.M = self.M
if hasattr(self, "_N"): self.N = self.N
+ if hasattr(self, "_E"): self.E = self.E
def _setupDenominator(self):
"""Compute Pade' denominator."""
RROMPyAssert(self._mode, message = "Cannot setup denominator.")
if self.verbosity >= 7:
verbosityDepth("INIT", "Starting computation of denominator.",
timestamp = self.timestamp)
while self.N > 0:
if self.POD:
ev, eV = self.findeveVGQR()
else:
ev, eV = self.findeveVGExplicit()
newParameters = checkRobustTolerance(ev, self.N, self.robustTol)
if not newParameters:
break
self.approxParameters = newParameters
if self.N <= 0:
eV = np.ones((1, 1))
q = np.zeros(tuple([self.N + 1] * self.npar), dtype = np.complex)
for j in range(eV.shape[0]):
q[tuple(hashI(j, self.npar))] = eV[j, 0]
if self.verbosity >= 7:
verbosityDepth("DEL", "Done computing denominator.",
timestamp = self.timestamp)
return q
def _setupNumerator(self):
"""Compute Pade' numerator."""
RROMPyAssert(self._mode, message = "Cannot setup numerator.")
if self.verbosity >= 7:
verbosityDepth("INIT", "Starting computation of numerator.",
timestamp = self.timestamp)
P = np.zeros(tuple([self.M + 1] * self.npar) + (np.prod(self.S),),
dtype = np.complex)
mEnd = hashD([self.M + 1] + [0] * (self.npar - 1))
nEnd = hashD([self.N + 1] + [0] * (self.npar - 1))
mnIdxs = nextDerivativeIndices([], self.npar, max(mEnd, nEnd))
for j in range(mEnd):
mIdx = mnIdxs[j]
for n in range(nEnd):
diffIdx = [x - y for (x, y) in zip(mIdx, mnIdxs[n])]
if all([x >= 0 for x in diffIdx]):
P[tuple(mIdx) + (hashD(diffIdx),)] = (
self.trainedModel.data.Q[tuple(mnIdxs[n])])
return self.rescaleByParameter(P).T
def setupApprox(self):
"""
Compute Pade' approximant. SVD-based robust eigenvalue management.
"""
if self.checkComputedApprox():
return
RROMPyAssert(self._mode, message = "Cannot setup approximant.")
if self.verbosity >= 5:
verbosityDepth("INIT", "Setting up {}.". format(self.name()),
timestamp = self.timestamp)
self.computeDerivatives()
if self.trainedModel is None:
self.trainedModel = tModel()
self.trainedModel.verbosity = self.verbosity
self.trainedModel.timestamp = self.timestamp
data = TrainedModelData(self.trainedModel.name(), self.mu0,
None, self.HFEngine.rescalingExp)
data.polytype = "MONOMIAL"
self.trainedModel.data = data
else:
self.trainedModel = self.trainedModel
if self.N > 0:
Q = self._setupDenominator()
else:
self.setScaleParameter()
Q = np.ones(1, dtype = np.complex)
self.trainedModel.data.Q = copy(Q)
self.trainedModel.data.scaleFactor = self.scaleFactor
self.trainedModel.data.projMat = copy(self.samplingEngine.samples(
list(range(np.prod(self.S)))))
P = self._setupNumerator()
if self.POD:
P = np.tensordot(self.samplingEngine.RPOD, P, axes = ([-1], [0]))
self.trainedModel.data.P = copy(P)
self.trainedModel.data.approxParameters = copy(self.approxParameters)
if self.verbosity >= 5:
verbosityDepth("DEL", "Done setting up approximant.",
timestamp = self.timestamp)
def setScaleParameter(self) -> Np2D:
"""Compute parameter for rescaling."""
RROMPyAssert(self._mode, message = "Cannot compute rescaling factor.")
self.computeDerivatives()
self.scaleFactor = [1.] * self.npar
for d in range(self.npar):
hashesd = [0]
for n in range(1, self.E + 1):
hashesd += [hashD([0] * (d - 1) + [n]
+ [0] * (self.npar - d - 1))]
if self.POD:
Rd = self.samplingEngine.RPOD[: hashesd[-1] + 1, hashesd]
Gd = np.diag(Rd.T.conj().dot(Rd))
else:
DerEd = self.samplingEngine.samples(hashesd)
Gd = self.HFEngine.norm(DerEd)
scaleCoeffs = np.polyfit(np.arange(len(Gd)), np.log(Gd), 1)
self.scaleFactor[d] = np.exp(- scaleCoeffs[0] / 2.)
def rescaleByParameter(self, R:Np2D) -> Np2D:
"""
Rescale by scale parameter.
Args:
R: Matrix whose columns need rescaling.
Returns:
Rescaled matrix.
"""
RIdxs = nextDerivativeIndices([], self.npar, R.shape[-1])
Rscaled = copy(R)
for j, RIdx in enumerate(RIdxs):
Rscaled[..., j] *= np.prod([x ** y for (x, y) in
zip(self.scaleFactor, RIdx)])
return Rscaled
def buildG(self):
"""Assemble Pade' denominator matrix."""
RROMPyAssert(self._mode, message = "Cannot compute G matrix.")
self.computeDerivatives()
if self.verbosity >= 10:
verbosityDepth("INIT", "Building gramian matrix.",
timestamp = self.timestamp)
eStart = hashD([self.E] + [0] * (self.npar - 1))
eEnd = hashD([self.E + 1] + [0] * (self.npar - 1))
eIdxs = [hashI(e, self.npar) for e in range(eStart, eEnd)]
nEnd = hashD([self.N + 1] + [0] * (self.npar - 1))
nIdxs = nextDerivativeIndices([], self.npar, nEnd)
self.setScaleParameter()
if self.POD:
RPODE = self.rescaleByParameter(self.samplingEngine.RPOD[: eEnd,
: eEnd])
else:
DerE = self.rescaleByParameter(self.samplingEngine.samples(
list(range(eEnd))).data)
self.G = np.zeros((nEnd, nEnd), dtype = np.complex)
for eIdx in eIdxs:
nLoc = []
samplesIdxs = []
for n, nIdx in enumerate(nIdxs):
diffIdx = [x - y for (x, y) in zip(eIdx, nIdx)]
if all([x >= 0 for x in diffIdx]):
nLoc += [n]
samplesIdxs += [hashD(diffIdx)]
if self.POD:
RPODELoc = RPODE[: samplesIdxs[-1] + 1, samplesIdxs]
GLoc = RPODELoc.T.conj().dot(RPODELoc)
else:
DerELoc = DerE[:, samplesIdxs]
GLoc = self.HFEngine.innerProduct(DerELoc, DerELoc)
for j in range(len(nLoc)):
self.G[nLoc[j], nLoc] = self.G[nLoc[j], nLoc] + GLoc[j]
if self.verbosity >= 10:
verbosityDepth("DEL", "Done building gramian.",
timestamp = self.timestamp)
def findeveVGExplicit(self) -> Tuple[Np1D, Np2D]:
"""
Compute explicitly eigenvalues and eigenvectors of Pade' denominator
matrix.
"""
RROMPyAssert(self._mode, message = "Cannot solve eigenvalue problem.")
self.buildG()
if self.verbosity >= 7:
verbosityDepth("INIT",
"Solving eigenvalue problem for gramian matrix.",
timestamp = self.timestamp)
ev, eV = np.linalg.eigh(self.G)
if self.verbosity >= 5:
try: condev = ev[-1] / ev[0]
except: condev = np.inf
verbosityDepth("MAIN", ("Solved eigenvalue problem of size {} "
"with condition number {:.4e}.").format(
self.G.shape[0],
condev),
timestamp = self.timestamp)
if self.verbosity >= 7:
verbosityDepth("DEL", "Done solving eigenvalue problem.",
timestamp = self.timestamp)
return ev, eV
def findeveVGQR(self) -> Tuple[Np1D, Np2D]:
"""
Compute eigenvalues and eigenvectors of Pade' denominator matrix
through SVD of R factor.
Returns:
Eigenvalues in ascending order and corresponding eigenvector
matrix.
"""
RROMPyAssert(self._mode, message = "Cannot solve eigenvalue problem.")
RROMPyAssert(self.POD, True, "POD value")
self.computeDerivatives()
eStart = hashD([self.E] + [0] * (self.npar - 1))
eEnd = hashD([self.E + 1] + [0] * (self.npar - 1))
eIdxs = [hashI(e, self.npar) for e in range(eStart, eEnd)]
nEnd = hashD([self.N + 1] + [0] * (self.npar - 1))
nIdxs = nextDerivativeIndices([], self.npar, nEnd)
self.setScaleParameter()
RPODE = self.rescaleByParameter(self.samplingEngine.RPOD[: eEnd,
: eEnd])
Rstack = np.zeros((RPODE.shape[0] * (eEnd - eStart), nEnd),
dtype = np.complex)
for k, eIdx in enumerate(eIdxs):
nLoc = []
samplesIdxs = []
for n, nIdx in enumerate(nIdxs):
diffIdx = [x - y for (x, y) in zip(eIdx, nIdx)]
if all([x >= 0 for x in diffIdx]):
nLoc += [n]
samplesIdxs += [hashD(diffIdx)]
RPODELoc = RPODE[:, samplesIdxs]
for j in range(len(nLoc)):
Rstack[k * RPODE.shape[0] : (k + 1) * RPODE.shape[0],
nLoc[j]] = RPODELoc[:, j]
if self.verbosity >= 7:
verbosityDepth("INIT", ("Solving svd for square root of "
"gramian matrix."),
timestamp = self.timestamp)
sizeI = Rstack.shape
_, s, V = np.linalg.svd(Rstack, full_matrices = False)
eV = V[::-1, :].T.conj()
if self.verbosity >= 5:
try: condev = s[0] / s[-1]
except: condev = np.inf
verbosityDepth("MAIN", ("Solved svd problem of size {} x {} with "
"condition number {:.4e}.").format(*sizeI,
condev),
timestamp = self.timestamp)
if self.verbosity >= 7:
verbosityDepth("DEL", "Done solving eigenvalue problem.",
timestamp = self.timestamp)
return s[::-1], eV
def centerNormalize(self, mu : paramList = [],
mu0 : paramVal = None) -> paramList:
"""
Compute normalized parameter to be plugged into approximant.
Args:
mu: Parameter(s) 1.
mu0: Parameter(s) 2. If None, set to self.mu0.
Returns:
Normalized parameter.
"""
return self.trainedModel.centerNormalize(mu, mu0)
def getResidues(self) -> sampList:
"""
Obtain approximant residues.
Returns:
Matrix with residues as columns.
"""
return self.trainedModel.getResidues()
diff --git a/rrompy/reduction_methods/distributed/rational_interpolant.py b/rrompy/reduction_methods/distributed/rational_interpolant.py
index 82ff842..d4f0218 100644
--- a/rrompy/reduction_methods/distributed/rational_interpolant.py
+++ b/rrompy/reduction_methods/distributed/rational_interpolant.py
@@ -1,508 +1,528 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
from copy import deepcopy as copy
import numpy as np
from rrompy.reduction_methods.base import checkRobustTolerance
from .generic_distributed_approximant import GenericDistributedApproximant
from rrompy.utilities.poly_fitting import customFit, customPInv
from rrompy.utilities.poly_fitting.polynomial import (polybases, polyfitname,
nextDerivativeIndices,
hashDerivativeToIdx as hashD,
hashIdxToDerivative as hashI,
homogeneizedpolyvander)
from rrompy.reduction_methods.trained_model import TrainedModelPade as tModel
from rrompy.reduction_methods.trained_model import TrainedModelData
from rrompy.utilities.base.types import (Np1D, Np2D, HFEng, DictAny, Tuple,
List, paramVal, paramList, sampList)
from rrompy.utilities.base import verbosityDepth, multifactorial
from rrompy.utilities.exception_manager import (RROMPyException, RROMPyAssert,
RROMPyWarning)
__all__ = ['RationalInterpolant']
class RationalInterpolant(GenericDistributedApproximant):
"""
ROM rational interpolant computation for parametric problems.
Args:
HFEngine: HF problem solver.
mu0(optional): Default parameter. Defaults to 0.
approxParameters(optional): Dictionary containing values for main
parameters of approximant. Recognized keys are:
- 'POD': whether to compute POD of snapshots; defaults to True;
- 'S': total number of samples current approximant relies upon;
- 'sampler': sample point generator;
- 'polybasis': type of polynomial basis for interpolation; allowed
values include 'MONOMIAL', 'CHEBYSHEV' and 'LEGENDRE'; defaults
to 'MONOMIAL';
- 'M': degree of Pade' interpolant numerator; defaults to 0;
- 'N': degree of Pade' interpolant denominator; defaults to 0;
- 'interpRcond': tolerance for interpolation; defaults to None;
- 'robustTol': tolerance for robust Pade' denominator management;
defaults to 0.
Defaults to empty dict.
homogeneized(optional): Whether to homogeneize Dirichlet BCs. Defaults
to False.
verbosity(optional): Verbosity level. Defaults to 10.
Attributes:
HFEngine: HF problem solver.
mu0: Default parameter.
mus: Array of snapshot parameters.
homogeneized: Whether to homogeneize Dirichlet BCs.
approxParameters: Dictionary containing values for main parameters of
approximant. Recognized keys are in parameterList.
parameterListSoft: Recognized keys of soft approximant parameters:
- 'POD': whether to compute POD of snapshots;
- 'polybasis': type of polynomial basis for interpolation;
- 'M': degree of Pade' interpolant numerator;
- 'N': degree of Pade' interpolant denominator;
- 'interpRcond': tolerance for interpolation via numpy.polyfit;
- 'robustTol': tolerance for robust Pade' denominator management.
parameterListCritical: Recognized keys of critical approximant
parameters:
- 'S': total number of samples current approximant relies upon;
- 'sampler': sample point generator.
POD: Whether to compute POD of snapshots.
S: Number of solution snapshots over which current approximant is
based upon.
sampler: Sample point generator.
polybasis: type of polynomial basis for interpolation.
M: Numerator degree of approximant.
N: Denominator degree of approximant.
interpRcond: Tolerance for interpolation via numpy.polyfit.
robustTol: Tolerance for robust Pade' denominator management.
E: Complete derivative depth level of S.
muBounds: list of bounds for parameter values.
samplingEngine: Sampling engine.
uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as
sampleList.
lastSolvedHF: Parameter(s) corresponding to last computed high fidelity
solution(s) as parameterList.
uAppReduced: Reduced approximate solution(s) with parameter(s)
lastSolvedApp as sampleList.
lastSolvedAppReduced: Parameter(s) corresponding to last computed
reduced approximate solution(s) as parameterList.
uApp: Approximate solution(s) with parameter(s) lastSolvedApp as
sampleList.
lastSolvedApp: Parameter(s) corresponding to last computed approximate
solution(s) as parameterList.
Q: Numpy 1D vector containing complex coefficients of approximant
denominator.
P: Numpy 2D vector whose columns are FE dofs of coefficients of
approximant numerator.
"""
def __init__(self, HFEngine:HFEng, mu0 : paramVal = None,
approxParameters : DictAny = {}, homogeneized : bool = False,
verbosity : int = 10, timestamp : bool = True):
self._preInit()
- self._addParametersToList(["polybasis", "M", "N", "interpRcond",
- "robustTol"], ["MONOMIAL", 0, 0, -1, 0])
+ self._addParametersToList(["polybasis", "M", "N", "E", "interpRcond",
+ "robustTol"], ["MONOMIAL", 0, 0, -1, -1, 0])
super().__init__(HFEngine = HFEngine, mu0 = mu0,
approxParameters = approxParameters,
homogeneized = homogeneized,
verbosity = verbosity, timestamp = timestamp)
self._postInit()
@property
def polybasis(self):
"""Value of polybasis."""
return self._polybasis
@polybasis.setter
def polybasis(self, polybasis):
try:
polybasis = polybasis.upper().strip().replace(" ","")
if polybasis not in polybases:
raise RROMPyException("Prescribed polybasis not recognized.")
self._polybasis = polybasis
except:
RROMPyWarning(("Prescribed polybasis not recognized. Overriding "
"to 'MONOMIAL'."))
self._polybasis = "MONOMIAL"
self._approxParameters["polybasis"] = self.polybasis
@property
def interpRcond(self):
"""Value of interpRcond."""
return self._interpRcond
@interpRcond.setter
def interpRcond(self, interpRcond):
self._interpRcond = interpRcond
self._approxParameters["interpRcond"] = self.interpRcond
@property
def M(self):
"""Value of M. Its assignment may change S."""
return self._M
@M.setter
def M(self, M):
if M < 0: raise RROMPyException("M must be non-negative.")
self._M = M
self._approxParameters["M"] = self.M
- if hasattr(self, "E") and self.E < self.M:
+ if hasattr(self, "_E") and self.E < self.M:
RROMPyWarning("Prescribed S is too small. Decreasing M.")
self.M = self.E
@property
def N(self):
"""Value of N. Its assignment may change S."""
return self._N
@N.setter
def N(self, N):
if N < 0: raise RROMPyException("N must be non-negative.")
self._N = N
self._approxParameters["N"] = self.N
- if hasattr(self, "E") and self.E < self.N:
+ if hasattr(self, "_E") and self.E < self.N:
RROMPyWarning("Prescribed S is too small. Decreasing N.")
self.N = self.E
+ @property
+ def E(self):
+ """Value of E."""
+ return self._E
+ @E.setter
+ def E(self, E):
+ if E < 0:
+ if not hasattr(self, "_S"):
+ raise RROMPyException(("Value of E must be positive if S is "
+ "not yet assigned."))
+ E = np.sum(hashI(np.prod(self.S), self.npar)) - 1
+ self._E = E
+ self._approxParameters["E"] = self.E
+ if (hasattr(self, "_S")
+ and self.E >= np.sum(hashI(np.prod(self.S), self.npar))):
+ RROMPyWarning("Prescribed S is too small. Decreasing E.")
+ self.E = -1
+ if hasattr(self, "_M"): self.M = self.M
+ if hasattr(self, "_N"): self.N = self.N
+
@property
def robustTol(self):
"""Value of tolerance for robust Pade' denominator management."""
return self._robustTol
@robustTol.setter
def robustTol(self, robustTol):
if robustTol < 0.:
RROMPyWarning(("Overriding prescribed negative robustness "
"tolerance to 0."))
robustTol = 0.
self._robustTol = robustTol
self._approxParameters["robustTol"] = self.robustTol
@property
def S(self):
"""Value of S."""
return self._S
@S.setter
def S(self, S):
GenericDistributedApproximant.S.fset(self, S)
- self.E = np.sum(hashI(np.prod(self.S), self.npar)) - 1
if hasattr(self, "_M"): self.M = self.M
if hasattr(self, "_N"): self.N = self.N
+ if hasattr(self, "_E"): self.E = self.E
def resetSamples(self):
"""Reset samples."""
super().resetSamples()
self._musUniqueCN = None
self._derIdxs = None
self._reorder = None
def _setupInterpolationIndices(self):
"""Setup parameters for polyvander."""
RROMPyAssert(self._mode,
message = "Cannot setup interpolation indices.")
if self._musUniqueCN is None or len(self._reorder) != len(self.mus):
self._musUniqueCN, musIdxsTo, musIdxs, musCount = (
self.centerNormalize(self.mus).unique(return_index = True,
return_inverse = True,
return_counts = True))
self._musUnique = self.mus[musIdxsTo]
self._derIdxs = [None] * len(self._musUniqueCN)
self._reorder = np.empty(len(musIdxs), dtype = int)
filled = 0
for j, cnt in enumerate(musCount):
self._derIdxs[j] = nextDerivativeIndices([], self.mus.shape[1],
cnt)
jIdx = np.nonzero(musIdxs == j)[0]
self._reorder[jIdx] = np.arange(filled, filled + cnt)
filled += cnt
def _setupDenominator(self):
"""Compute Pade' denominator."""
RROMPyAssert(self._mode, message = "Cannot setup denominator.")
if self.verbosity >= 7:
verbosityDepth("INIT", "Starting computation of denominator.",
timestamp = self.timestamp)
while self.N > 0:
invD = self._computeInterpolantInverseBlocks()
if self.POD:
ev, eV = self.findeveVGQR(self.samplingEngine.RPOD, invD)
else:
ev, eV = self.findeveVGExplicit(self.samplingEngine.samples,
invD)
newParams = checkRobustTolerance(ev, self.N, self.robustTol)
if not newParams:
break
self.approxParameters = newParams
if self.N <= 0:
self._N = 0
eV = np.ones((1, 1))
if self.verbosity >= 7:
verbosityDepth("DEL", "Done computing denominator.",
timestamp = self.timestamp)
q = np.zeros(tuple([self.N + 1] * self.npar), dtype = eV.dtype)
for j in range(eV.shape[0]):
q[tuple(hashI(j, self.npar))] = eV[j, 0]
return q
def _setupNumerator(self):
"""Compute Pade' numerator."""
RROMPyAssert(self._mode, message = "Cannot setup numerator.")
if self.verbosity >= 7:
verbosityDepth("INIT", "Starting computation of numerator.",
timestamp = self.timestamp)
Qevaldiag = np.zeros((len(self.mus), len(self.mus)),
dtype = np.complex)
verb = self.trainedModel.verbosity
self.trainedModel.verbosity = 0
self._setupInterpolationIndices()
idxGlob = 0
for j, derIdxs in enumerate(self._derIdxs):
nder = len(derIdxs)
idxLoc = np.arange(len(self.mus))[(self._reorder >= idxGlob)
* (self._reorder < idxGlob + nder)]
idxGlob += nder
Qval = [0] * nder
for der in range(nder):
derIdx = hashI(der, self.npar)
Qval[der] = (self.trainedModel.getQVal(
self._musUnique[j], derIdx,
scl = np.power(self.scaleFactor, -1.))
/ multifactorial(derIdx))
for derU, derUIdx in enumerate(derIdxs):
for derQ, derQIdx in enumerate(derIdxs):
diffIdx = [x - y for (x, y) in zip(derUIdx, derQIdx)]
if all([x >= 0 for x in diffIdx]):
diffj = hashD(diffIdx)
Qevaldiag[idxLoc[derU], idxLoc[derQ]] = Qval[diffj]
self.trainedModel.verbosity = verb
while self.M >= 0:
fitVander, _, argIdxs = homogeneizedpolyvander(self._musUniqueCN,
self.M, self.polybasis,
self._derIdxs, self._reorder,
scl = np.power(self.scaleFactor, -1.))
fitVander = fitVander[:, argIdxs]
fitOut = customFit(fitVander, Qevaldiag, full = True,
rcond = self.interpRcond)
if self.verbosity >= 5:
condfit = fitOut[1][2][0] / fitOut[1][2][-1]
verbosityDepth("MAIN", ("Fitting {} samples with degree {} "
"through {}... Conditioning of LS "
"system: {:.4e}.").format(
fitVander.shape[0], self.M,
polyfitname(self.polybasis),
condfit),
timestamp = self.timestamp)
if fitOut[1][1] == fitVander.shape[1]:
P = fitOut[0]
break
RROMPyWarning("Polyfit is poorly conditioned. Reducing M by 1.")
self.M -= 1
if self.M < 0:
raise RROMPyException(("Instability in computation of numerator. "
"Aborting."))
if self.verbosity >= 7:
verbosityDepth("DEL", "Done computing numerator.",
timestamp = self.timestamp)
p = np.zeros(tuple([self.M + 1] * self.npar) + (P.shape[1],),
dtype = P.dtype)
for j in range(P.shape[0]):
p[tuple(hashI(j, self.npar))] = P[j, :]
return p.T
def setupApprox(self):
"""
Compute Pade' interpolant.
SVD-based robust eigenvalue management.
"""
if self.checkComputedApprox():
return
RROMPyAssert(self._mode, message = "Cannot setup approximant.")
if self.verbosity >= 5:
verbosityDepth("INIT", "Setting up {}.". format(self.name()),
timestamp = self.timestamp)
self.computeScaleFactor()
self.computeSnapshots()
if self.trainedModel is None:
self.trainedModel = tModel()
self.trainedModel.verbosity = self.verbosity
self.trainedModel.timestamp = self.timestamp
data = TrainedModelData(self.trainedModel.name(), self.mu0,
self.samplingEngine.samples,
self.HFEngine.rescalingExp)
data.polytype = self.polybasis
data.scaleFactor = self.scaleFactor
data.mus = copy(self.mus)
self.trainedModel.data = data
else:
self.trainedModel = self.trainedModel
self.trainedModel.data.projMat = copy(self.samplingEngine.samples)
if self.N > 0:
Q = self._setupDenominator()
else:
Q = np.ones(1, dtype = np.complex)
self.trainedModel.data.Q = copy(Q)
P = self._setupNumerator()
if self.POD:
P = np.tensordot(self.samplingEngine.RPOD, P, axes = ([-1], [0]))
self.trainedModel.data.P = copy(P)
self.trainedModel.data.approxParameters = copy(self.approxParameters)
if self.verbosity >= 5:
verbosityDepth("DEL", "Done setting up approximant.",
timestamp = self.timestamp)
def _computeInterpolantInverseBlocks(self) -> List[Np2D]:
"""
Compute inverse factors for minimal interpolant target functional.
"""
RROMPyAssert(self._mode, message = "Cannot solve eigenvalue problem.")
self._setupInterpolationIndices()
eWidth = (hashD([self.E + 1] + [0] * (self.npar - 1))
- hashD([self.E] + [0] * (self.npar - 1)))
TE, _, argIdxs = homogeneizedpolyvander(self._musUniqueCN, self.E,
self.polybasis, self._derIdxs, self._reorder,
scl = np.power(self.scaleFactor, -1.))
fitOut = customPInv(TE[:, argIdxs], rcond = self.interpRcond,
full = True)
self._fitinv = fitOut[0][- eWidth : , :]
if self.verbosity >= 5:
condfit = fitOut[1][1][0] / fitOut[1][1][-1]
verbosityDepth("MAIN", ("Fitting {} samples with degree {} "
"through {}... Conditioning of "
"pseudoinverse system: {:.4e}.").format(
TE.shape[0], self.E,
polyfitname(self.polybasis),
condfit),
timestamp = self.timestamp)
TN, _, argIdxs = homogeneizedpolyvander(self._musUniqueCN, self.N,
self.polybasis, self._derIdxs, self._reorder,
scl = np.power(self.scaleFactor, -1.))
TN = TN[:, argIdxs]
invD = [None] * (eWidth)
for k in range(eWidth):
pseudoInv = np.diag(self._fitinv[k, :])
idxGlob = 0
for j, derIdxs in enumerate(self._derIdxs):
nder = len(derIdxs)
idxGlob += nder
if nder > 1:
idxLoc = np.arange(len(self.mus))[
(self._reorder >= idxGlob - nder)
* (self._reorder < idxGlob)]
invLoc = self._fitinv[k, idxLoc]
pseudoInv[np.ix_(idxLoc, idxLoc)] = 0.
for diffj, diffjIdx in enumerate(derIdxs):
for derQ, derQIdx in enumerate(derIdxs):
derUIdx = [x - y for (x, y) in
zip(diffjIdx, derQIdx)]
if all([x >= 0 for x in derUIdx]):
derU = hashD(derUIdx)
pseudoInv[idxLoc[derU], idxLoc[derQ]] = (
invLoc[diffj])
invD[k] = pseudoInv.dot(TN)
return invD
def findeveVGExplicit(self, sampleE:sampList,
invD:List[Np2D]) -> Tuple[Np1D, Np2D]:
"""
Compute explicitly eigenvalues and eigenvectors of Pade' denominator
matrix.
"""
RROMPyAssert(self._mode, message = "Cannot solve eigenvalue problem.")
nEnd = invD[0].shape[1]
eWidth = len(invD)
if self.verbosity >= 10:
verbosityDepth("INIT", "Building gramian matrix.",
timestamp = self.timestamp)
gramian = self.HFEngine.innerProduct(sampleE, sampleE)
G = np.zeros((nEnd, nEnd), dtype = np.complex)
for k in range(eWidth):
G += invD[k].T.conj().dot(gramian.dot(invD[k]))
if self.verbosity >= 10:
verbosityDepth("DEL", "Done building gramian.",
timestamp = self.timestamp)
if self.verbosity >= 7:
verbosityDepth("INIT", ("Solving eigenvalue problem for "
"gramian matrix."),
timestamp = self.timestamp)
ev, eV = np.linalg.eigh(G)
if self.verbosity >= 5:
try: condev = ev[-1] / ev[0]
except: condev = np.inf
verbosityDepth("MAIN", ("Solved eigenvalue problem of "
"size {} with condition number "
"{:.4e}.").format(nEnd, condev),
timestamp = self.timestamp)
if self.verbosity >= 7:
verbosityDepth("DEL", "Done solving eigenvalue problem.",
timestamp = self.timestamp)
return ev, eV
def findeveVGQR(self, RPODE:Np2D, invD:List[Np2D]) -> Tuple[Np1D, Np2D]:
"""
Compute eigenvalues and eigenvectors of Pade' denominator matrix
through SVD of R factor.
"""
RROMPyAssert(self._mode, message = "Cannot solve eigenvalue problem.")
nEnd = invD[0].shape[1]
S = RPODE.shape[0]
eWidth = len(invD)
if self.verbosity >= 10:
verbosityDepth("INIT", "Building half-gramian matrix stack.",
timestamp = self.timestamp)
Rstack = np.zeros((S * eWidth, nEnd), dtype = np.complex)
for k in range(eWidth):
Rstack[k * S : (k + 1) * S, :] = RPODE.dot(invD[k])
if self.verbosity >= 10:
verbosityDepth("DEL", "Done building half-gramian.",
timestamp = self.timestamp)
if self.verbosity >= 7:
verbosityDepth("INIT", ("Solving svd for square root of "
"gramian matrix."),
timestamp = self.timestamp)
_, s, eV = np.linalg.svd(Rstack, full_matrices = False)
ev = s[::-1]
eV = eV[::-1, :].T.conj()
if self.verbosity >= 5:
try: condev = s[0] / s[-1]
except: condev = np.inf
verbosityDepth("MAIN", ("Solved svd problem of size {} x "
"{} with condition number "
"{:.4e}.").format(*Rstack.shape, condev),
timestamp = self.timestamp)
if self.verbosity >= 7:
verbosityDepth("DEL", "Done solving svd.",
timestamp = self.timestamp)
return ev, eV
def centerNormalize(self, mu : paramList = [],
mu0 : paramVal = None) -> paramList:
"""
Compute normalized parameter to be plugged into approximant.
Args:
mu: Parameter(s) 1.
mu0: Parameter(s) 2. If None, set to self.mu0.
Returns:
Normalized parameter.
"""
return self.trainedModel.centerNormalize(mu, mu0)
def getResidues(self) -> Np1D:
"""
Obtain approximant residues.
Returns:
Matrix with residues as columns.
"""
return self.trainedModel.getResidues()
diff --git a/rrompy/reduction_methods/distributed_greedy/rational_interpolant_greedy.py b/rrompy/reduction_methods/distributed_greedy/rational_interpolant_greedy.py
index 9e27318..0ec150b 100644
--- a/rrompy/reduction_methods/distributed_greedy/rational_interpolant_greedy.py
+++ b/rrompy/reduction_methods/distributed_greedy/rational_interpolant_greedy.py
@@ -1,416 +1,416 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
from copy import deepcopy as copy
import numpy as np
from .generic_distributed_greedy_approximant import \
GenericDistributedGreedyApproximant
from rrompy.utilities.poly_fitting.polynomial import polybases, polydomcoeff
from rrompy.reduction_methods.distributed import RationalInterpolant
from rrompy.reduction_methods.trained_model import TrainedModelPade as tModel
from rrompy.reduction_methods.trained_model import TrainedModelData
from rrompy.utilities.base.types import Np1D, Np2D, DictAny, HFEng, paramVal
from rrompy.utilities.base import verbosityDepth
from rrompy.utilities.exception_manager import RROMPyWarning
from rrompy.utilities.exception_manager import RROMPyException, RROMPyAssert
__all__ = ['RationalInterpolantGreedy']
class RationalInterpolantGreedy(GenericDistributedGreedyApproximant,
RationalInterpolant):
"""
ROM greedy rational interpolant computation for parametric problems.
Args:
HFEngine: HF problem solver.
mu0(optional): Default parameter. Defaults to 0.
approxParameters(optional): Dictionary containing values for main
parameters of approximant. Recognized keys are:
- 'POD': whether to compute POD of snapshots; defaults to True;
- 'S': number of starting training points;
- 'sampler': sample point generator;
- 'greedyTol': uniform error tolerance for greedy algorithm;
defaults to 1e-2;
- 'interactive': whether to interactively terminate greedy
algorithm; defaults to False;
- 'maxIter': maximum number of greedy steps; defaults to 1e2;
- 'refinementRatio': ratio of training points to be exhausted
before training set refinement; defaults to 0.2;
- 'nTestPoints': number of test points; defaults to 5e2;
- 'trainSetGenerator': training sample points generator; defaults
to Chebyshev sampler within muBounds;
- 'polybasis': type of basis for interpolation; allowed values
include 'MONOMIAL', 'CHEBYSHEV' and 'LEGENDRE'; defaults to
'MONOMIAL';
- 'Delta': difference between M and N in rational approximant;
defaults to 0;
- 'errorEstimatorKind': kind of error estimator; available values
include 'EXACT', 'BASIC', and 'BARE'; defaults to 'EXACT';
- 'interpRcond': tolerance for interpolation; defaults to None;
- 'robustTol': tolerance for robust Pade' denominator management;
defaults to 0.
Defaults to empty dict.
homogeneized(optional): Whether to homogeneize Dirichlet BCs. Defaults
to False.
verbosity(optional): Verbosity level. Defaults to 10.
Attributes:
HFEngine: HF problem solver.
mu0: Default parameter.
mus: Array of snapshot parameters.
homogeneized: Whether to homogeneize Dirichlet BCs.
approxParameters: Dictionary containing values for main parameters of
approximant. Recognized keys are in parameterList.
parameterListSoft: Recognized keys of soft approximant parameters:
- 'POD': whether to compute POD of snapshots.
- 'greedyTol': uniform error tolerance for greedy algorithm;
- 'interactive': whether to interactively terminate greedy
algorithm;
- 'maxIter': maximum number of greedy steps;
- 'refinementRatio': ratio of test points to be exhausted before
test set refinement;
- 'nTestPoints': number of test points;
- 'trainSetGenerator': training sample points generator;
- 'Delta': difference between M and N in rational approximant;
- 'errorEstimatorKind': kind of error estimator;
- 'interpRcond': tolerance for interpolation;
- 'robustTol': tolerance for robust Pade' denominator management.
parameterListCritical: Recognized keys of critical approximant
parameters:
- 'S': total number of samples current approximant relies upon;
- 'sampler': sample point generator.
POD: whether to compute POD of snapshots.
S: number of test points.
sampler: Sample point generator.
greedyTol: uniform error tolerance for greedy algorithm.
interactive: whether to interactively terminate greedy algorithm.
maxIter: maximum number of greedy steps.
refinementRatio: ratio of training points to be exhausted before
training set refinement.
nTestPoints: number of starting training points.
trainSetGenerator: training sample points generator.
robustTol: tolerance for robust Pade' denominator management.
Delta: difference between M and N in rational approximant.
errorEstimatorKind: kind of error estimator.
interpRcond: tolerance for interpolation.
robustTol: tolerance for robust Pade' denominator management.
muBounds: list of bounds for parameter values.
samplingEngine: Sampling engine.
estimatorNormEngine: Engine for estimator norm computation.
uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as
sampleList.
lastSolvedHF: Parameter(s) corresponding to last computed high fidelity
solution(s) as parameterList.
uAppReduced: Reduced approximate solution(s) with parameter(s)
lastSolvedApp as sampleList.
lastSolvedAppReduced: Parameter(s) corresponding to last computed
reduced approximate solution(s) as parameterList.
uApp: Approximate solution(s) with parameter(s) lastSolvedApp as
sampleList.
lastSolvedApp: Parameter(s) corresponding to last computed approximate
solution(s) as parameterList.
"""
_allowedEstimatorKinds = ["EXACT", "BASIC", "BARE"]
def __init__(self, HFEngine:HFEng, mu0 : paramVal = None,
approxParameters : DictAny = {}, homogeneized : bool = False,
verbosity : int = 10, timestamp : bool = True):
self._preInit()
self._addParametersToList(["Delta", "polybasis", "errorEstimatorKind",
"interpRcond", "robustTol"],
- [0, "MONOMIAL", "EXACT", -1, 0])
+ [0, "MONOMIAL", "EXACT", -1, 0],
+ toBeExcluded = ["E"])
super().__init__(HFEngine = HFEngine, mu0 = mu0,
approxParameters = approxParameters,
homogeneized = homogeneized,
verbosity = verbosity, timestamp = timestamp)
if self.verbosity >= 7:
verbosityDepth("INIT", "Computing Taylor blocks of system.",
timestamp = self.timestamp)
nAs = self.HFEngine.nAs - 1
nbs = max(self.HFEngine.nbs, (nAs + 1) * self.homogeneized)
self.As = [self.HFEngine.A(self.mu0, j + 1) for j in range(nAs)]
self.bs = [self.HFEngine.b(self.mu0, j, self.homogeneized)
for j in range(nbs)]
if self.verbosity >= 7:
verbosityDepth("DEL", "Done computing Taylor blocks.",
timestamp = self.timestamp)
self._postInit()
@property
def polybasis(self):
"""Value of polybasis."""
return self._polybasis
@polybasis.setter
def polybasis(self, polybasis):
try:
polybasis = polybasis.upper().strip().replace(" ","")
if polybasis not in polybases:
raise RROMPyException("Sample type not recognized.")
self._polybasis = polybasis
except:
RROMPyWarning(("Prescribed polybasis not recognized. Overriding "
"to 'MONOMIAL'."))
self._polybasis = "MONOMIAL"
self._approxParameters["polybasis"] = self.polybasis
@property
def Delta(self):
"""Value of Delta."""
return self._Delta
@Delta.setter
def Delta(self, Delta):
if not np.isclose(Delta, np.floor(Delta)):
raise RROMPyException("Delta must be an integer.")
if Delta < 0:
RROMPyWarning(("Error estimator unreliable for Delta < 0. "
"Overloading of errorEstimator is suggested."))
else:
Deltamin = (max(self.HFEngine.nbs,
self.HFEngine.nAs * self.homogeneized)
- 1 - 1 * (self.HFEngine.nAs > 1))
if Delta < Deltamin:
RROMPyWarning(("Method may be unreliable for selected Delta. "
"Suggested minimal value of Delta: {}.").format(
Deltamin))
self._Delta = Delta
self._approxParameters["Delta"] = self.Delta
@property
def errorEstimatorKind(self):
"""Value of errorEstimatorKind."""
return self._errorEstimatorKind
@errorEstimatorKind.setter
def errorEstimatorKind(self, errorEstimatorKind):
errorEstimatorKind = errorEstimatorKind.upper()
if errorEstimatorKind not in self._allowedEstimatorKinds:
RROMPyWarning(("Error estimator kind not recognized. Overriding "
"to 'EXACT'."))
errorEstimatorKind = "EXACT"
self._errorEstimatorKind = errorEstimatorKind
self._approxParameters["errorEstimatorKind"] = self.errorEstimatorKind
@property
def nTestPoints(self):
"""Value of nTestPoints."""
return self._nTestPoints
@nTestPoints.setter
def nTestPoints(self, nTestPoints):
if nTestPoints <= np.abs(self.Delta):
RROMPyWarning(("nTestPoints must be at least abs(Delta) + 1. "
"Increasing value to abs(Delta) + 1."))
nTestPoints = np.abs(self.Delta) + 1
if not np.isclose(nTestPoints, np.int(nTestPoints)):
raise RROMPyException("nTestPoints must be an integer.")
nTestPoints = np.int(nTestPoints)
if hasattr(self, "_nTestPoints") and self.nTestPoints is not None:
nTestPointsold = self.nTestPoints
else: nTestPointsold = -1
self._nTestPoints = nTestPoints
self._approxParameters["nTestPoints"] = self.nTestPoints
if nTestPointsold != self.nTestPoints:
self.resetSamples()
def _errorSamplingRatio(self, mus:Np1D, muRTest:Np1D,
muRTrain:Np1D) -> Np1D:
"""Scalar ratio in explicit error estimator."""
self.setupApprox()
testTile = np.tile(np.reshape(muRTest, (-1, 1)), [1, len(muRTrain)])
nodalVals = np.prod(testTile - np.reshape(muRTrain, (1, -1)), axis = 1)
denVals = self.trainedModel.getQVal(mus)
return np.abs(nodalVals / denVals)
def _RHSNorms(self, radiusb0:Np2D) -> Np1D:
"""High fidelity system RHS norms."""
self.assembleReducedResidualBlocks(full = False)
# 'ij,jk,ik->k', resbb, radiusb0, radiusb0.conj()
b0resb0 = np.sum(self.trainedModel.data.resbb.dot(radiusb0)
* radiusb0.conj(), axis = 0)
RHSnorms = np.power(np.abs(b0resb0), .5)
return RHSnorms
def _errorEstimatorBare(self) -> Np1D:
"""Bare residual-based error estimator."""
self.setupApprox()
self.assembleReducedResidualGramian(self.trainedModel.data.projMat)
pDom = self.trainedModel.data.P[:, -1]
LL = pDom.conj().dot(self.trainedModel.data.gramian.dot(pDom))
Adiag = self.As[0].diagonal()
LL = ((self.scaleFactor[0] * np.linalg.norm(Adiag)) ** 2. * LL
/ np.size(Adiag))
scalingDom = polydomcoeff(len(self.mus) - 1, self.polybasis)
return scalingDom * np.power(np.abs(LL), .5)
def _errorEstimatorBasic(self, muTest:paramVal, ratioTest:complex) -> Np1D:
"""Basic residual-based error estimator."""
resmu = self.HFEngine.residual(self.trainedModel.getApprox(muTest),
muTest, self.homogeneized)[0]
return np.abs(self.estimatorNormEngine.norm(resmu) / ratioTest)
def _errorEstimatorExact(self, muRTrain:Np1D, vanderBase:Np2D) -> Np1D:
"""Exact residual-based error estimator."""
self.setupApprox()
self.assembleReducedResidualBlocks(full = True)
nAs = self.HFEngine.nAs - 1
nbs = max(self.HFEngine.nbs - 1, nAs * self.homogeneized)
delta = len(self.mus) - len(self.trainedModel.data.Q)
nbsEff = max(0, nbs - delta)
momentQ = np.zeros(nbsEff, dtype = np.complex)
momentQu = np.zeros((len(self.mus), nAs), dtype = np.complex)
radiusbTen = np.zeros((nbsEff, nbsEff, vanderBase.shape[1]),
dtype = np.complex)
radiusATen = np.zeros((nAs, nAs, vanderBase.shape[1]),
dtype = np.complex)
if nbsEff > 0:
momentQ[0] = self.trainedModel.data.Q[-1]
radiusbTen[0, :, :] = vanderBase[: nbsEff, :]
momentQu[:, 0] = self.trainedModel.data.P[:, -1]
radiusATen[0, :, :] = vanderBase[: nAs, :]
Qvals = self.trainedModel.getQVal(self.mus)
for k in range(1, max(nAs, nbs * (nbsEff > 0))):
Qvals = Qvals * muRTrain
if k > delta and k < nbs:
momentQ[k - delta] = self._fitinv.dot(Qvals)
radiusbTen[k - delta, k :, :] = (
radiusbTen[0, : delta - k, :])
if k < nAs:
momentQu[:, k] = Qvals * self._fitinv
radiusATen[k, k :, :] = radiusATen[0, : - k, :]
if self.POD and nAs > 1:
momentQu[:, 1 :] = self.samplingEngine.RPOD.dot(
momentQu[:, 1 :])
radiusA = np.tensordot(momentQu, radiusATen, 1)
if nbsEff > 0:
radiusb = np.tensordot(momentQ, radiusbTen, 1)
# 'ij,jk,ik->k', resbb, radiusb, radiusb.conj()
ff = np.sum(self.trainedModel.data.resbb[delta + 1 :, delta + 1 :]\
.dot(radiusb) * radiusb.conj(), axis = 0)
# 'ijk,jkl,il->l', resAb, radiusA, radiusb.conj()
Lf = np.sum(np.tensordot(
self.trainedModel.data.resAb[delta :, :, :], radiusA, 2)
* radiusb.conj(), axis = 0)
else:
ff, Lf = 0., 0.
# 'ijkl,klt,ijt->t', resAA, radiusA, radiusA.conj()
LL = np.sum(np.tensordot(self.trainedModel.data.resAA, radiusA, 2)
* radiusA.conj(), axis = (0, 1))
scalingDom = polydomcoeff(len(self.mus) - 1, self.polybasis)
return scalingDom * np.power(np.abs(ff - 2. * np.real(Lf) + LL), .5)
def errorEstimator(self, mus:Np1D) -> Np1D:
"""Standard residual-based error estimator."""
self.setupApprox()
if (np.any(np.isnan(self.trainedModel.data.P[:, -1]))
or np.any(np.isinf(self.trainedModel.data.P[:, -1]))):
err = np.empty(len(mus))
err[:] = np.inf
return err
nAs = self.HFEngine.nAs - 1
nbs = max(self.HFEngine.nbs - 1, nAs * self.homogeneized)
muRTest = self.centerNormalize(mus)(0)
muRTrain = self.centerNormalize(self.mus)(0)
samplingRatio = self._errorSamplingRatio(mus, muRTest, muRTrain)
vanderBase = np.polynomial.polynomial.polyvander(muRTest,
max(nAs, nbs)).T
RHSnorms = self._RHSNorms(vanderBase[: nbs + 1, :])
if self.errorEstimatorKind == "BARE":
jOpt = self._errorEstimatorBare()
elif self.errorEstimatorKind == "BASIC":
idx_muTestSample = np.argmax(samplingRatio)
jOpt = self._errorEstimatorBasic(mus[idx_muTestSample],
samplingRatio[idx_muTestSample])
else: #if self.errorEstimatorKind == "EXACT":
jOpt = self._errorEstimatorExact(muRTrain, vanderBase[: -1, :])
return jOpt * samplingRatio / RHSnorms
def setupApprox(self, plotEst : bool = False):
"""
Compute Pade' interpolant.
SVD-based robust eigenvalue management.
"""
if self.checkComputedApprox():
return
RROMPyAssert(self._mode, message = "Cannot setup approximant.")
if self.verbosity >= 5:
verbosityDepth("INIT", "Setting up {}.". format(self.name()),
timestamp = self.timestamp)
self.computeScaleFactor()
self.greedy(plotEst)
- self._N = len(self.mus) - 1
- self._M = copy(self._N)
- self.E = copy(self._N)
+ self._S = len(self.mus)
+ self._N, self._M, self._E = [self._S - 1] * 3
if self.Delta < 0:
self._M += self.Delta
else:
self._N -= self.Delta
if self.trainedModel is None:
self.trainedModel = tModel()
self.trainedModel.verbosity = self.verbosity
self.trainedModel.timestamp = self.timestamp
data = TrainedModelData(self.trainedModel.name(), self.mu0,
self.samplingEngine.samples,
self.HFEngine.rescalingExp)
data.polytype = self.polybasis
data.scaleFactor = self.scaleFactor
data.mus = copy(self.mus)
self.trainedModel.data = data
else:
self.trainedModel = self.trainedModel
self.trainedModel.data.projMat = copy(self.samplingEngine.samples)
self.trainedModel.data.mus = copy(self.mus)
if min(self.M, self.N) < 0:
if self.verbosity >= 5:
verbosityDepth("MAIN", "Minimal sample size not achieved.",
timestamp = self.timestamp)
Q = np.empty(max(self.N, 0) + 1, dtype = np.complex)
P = np.empty((len(self.mus), max(self.M, 0) + 1),
dtype = np.complex)
Q[:] = np.nan
P[:] = np.nan
self.trainedModel.data.Q = copy(Q)
self.trainedModel.data.P = copy(P)
self.trainedModel.data.approxParameters = copy(
self.approxParameters)
if self.verbosity >= 5:
verbosityDepth("DEL", "Aborting computation of approximant.",
timestamp = self.timestamp)
return
if self.N > 0:
Q = self._setupDenominator()
else:
Q = np.ones(1, dtype = np.complex)
self.trainedModel.data.Q = copy(Q)
P = self._setupNumerator()
if self.POD:
P = np.tensordot(self.samplingEngine.RPOD, P, axes = ([-1], [0]))
self.trainedModel.data.P = copy(P)
self.trainedModel.data.approxParameters = copy(self.approxParameters)
if self.verbosity >= 5:
verbosityDepth("DEL", "Done setting up approximant.",
timestamp = self.timestamp)
def assembleReducedResidualBlocks(self, full : bool = False):
"""Build affine blocks of reduced linear system through projections."""
scaling = self.trainedModel.data.scaleFactor[0]
self.assembleReducedResidualBlocksbb(self.bs, scaling)
if full:
pMat = self.trainedModel.data.projMat
self.assembleReducedResidualBlocksAb(self.As, self.bs[1 :],
pMat, scaling)
self.assembleReducedResidualBlocksAA(self.As, pMat, scaling)
diff --git a/rrompy/utilities/base/__init__.py b/rrompy/utilities/base/__init__.py
index c6c8528..6bcf126 100644
--- a/rrompy/utilities/base/__init__.py
+++ b/rrompy/utilities/base/__init__.py
@@ -1,53 +1,55 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
from .find_dict_str_key import findDictStrKey
from .get_new_filename import getNewFilename
from .kroneckerer import kroneckerer
from .factorials import multibinom, multifactorial
from .pickle_utilities import pickleDump, pickleLoad
from .purge_dict import purgeDict
from .purge_list import purgeList
from .number_theory import (squareResonances, primeFactorize,
getLowestPrimeFactor)
+from .halton import haltonGenerate
from .sobol import sobolGenerate
from .low_discrepancy import vanderCorput, lowDiscrepancy
from . import types as Types
from .verbosity_depth import verbosityDepth
__all__ = [
'findDictStrKey',
'getNewFilename',
'kroneckerer',
'multibinom',
'multifactorial',
'pickleDump',
'pickleLoad',
'purgeDict',
'purgeList',
'squareResonances',
'primeFactorize',
'getLowestPrimeFactor',
+ 'haltonGenerate',
'sobolGenerate',
'vanderCorput',
'lowDiscrepancy',
'Types',
'verbosityDepth'
]
diff --git a/rrompy/utilities/base/halton.py b/rrompy/utilities/base/halton.py
new file mode 100644
index 0000000..fe1e24c
--- /dev/null
+++ b/rrompy/utilities/base/halton.py
@@ -0,0 +1,52 @@
+# Copyright (C) 2018 by the RROMPy authors
+#
+# This file is part of RROMPy.
+#
+# RROMPy is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# RROMPy is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with RROMPy. If not, see .
+#
+
+import numpy as np
+
+__all__ = ['haltonGenerate']
+
+def next_prime():
+ def is_prime(num):
+ "Checks if num is a prime value"
+ for i in range(2,int(num**0.5)+1):
+ if(num % i)==0: return False
+ return True
+ prime = 3
+ while(1):
+ if is_prime(prime):
+ yield prime
+ prime += 2
+
+def vdc(n, base=2):
+ vdc, denom = 0, 1
+ while n:
+ denom *= base
+ n, remainder = divmod(n, base)
+ vdc += remainder/float(denom)
+ return vdc
+
+def haltonGenerate(dim, size, seed = 0, step = 2):
+ seq = np.empty((size, dim), dtype = float)
+ primeGen = next_prime()
+ next(primeGen)
+ for d in range(dim):
+ base = next(primeGen)
+ for i in range(seed, step * size + seed, step):
+ seq[(i - seed) // step, d] = vdc(i, base)
+ return seq
+
diff --git a/rrompy/utilities/base/low_discrepancy.py b/rrompy/utilities/base/low_discrepancy.py
index 47f83ca..74ce9d7 100644
--- a/rrompy/utilities/base/low_discrepancy.py
+++ b/rrompy/utilities/base/low_discrepancy.py
@@ -1,46 +1,47 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
import numpy as np
from rrompy.utilities.base.types import Np1D
from rrompy.utilities.exception_manager import RROMPyException
__all__ = ['vanderCorput', 'lowDiscrepancy']
def vanderCorput(n:int) -> Np1D:
if n <= 0:
raise RROMPyException("Only positive integers allowed.")
x = [0] * n
ln = int(np.ceil(np.log2(n)))
for j in range(n):
x[j] = int(np.binary_repr(j, width = ln)[::-1], 2)
return x
-def lowDiscrepancy(n:int) -> Np1D:
+def lowDiscrepancy(n:int, inverse : bool = False) -> Np1D:
if n <= 0:
raise RROMPyException("Only positive integers allowed.")
x = [0] * n
max2Pow = np.binary_repr(n)[::-1].find('1')
max2Fac = 2 ** max2Pow
res2 = n // max2Fac
xBase = res2 * np.array(vanderCorput(max2Fac), dtype = np.int)
stride = ((n - 1) // 2 - 1) * (max2Pow == 0) + 1
for i in range(res2):
x[i * max2Fac : (i + 1) * max2Fac] = (i * stride + xBase) % n
+ if inverse: x = np.argsort(x)
return x
diff --git a/rrompy/utilities/poly_fitting/polynomial/homogeneization.py b/rrompy/utilities/poly_fitting/polynomial/homogeneization.py
index bbcee6e..f42d5e1 100644
--- a/rrompy/utilities/poly_fitting/polynomial/homogeneization.py
+++ b/rrompy/utilities/poly_fitting/polynomial/homogeneization.py
@@ -1,55 +1,55 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
import numpy as np
from rrompy.utilities.base.types import Np1D, Np2D, Tuple, List, paramList
-from rrompy.utilities.poly_fitting.polynomial import (polyvander,
- nextDerivativeIndices)
+from rrompy.utilities.poly_fitting.polynomial import polyvander
from rrompy.parameter import checkParameterList
__all__ = ['homogeneizationMask', 'homogeneizedpolyvander']
def homogeneizationMask(degs:List[int]) -> Tuple[List[int], List[bool]]:
dim = len(degs)
N = np.prod([d + 1 for d in degs])
maskN = np.arange(N, dtype = int)
remN = np.zeros((N, dim), dtype = int)
for j in range(dim - 1, -1, -1):
remN[:, j] = maskN % (degs[j] + 1)
maskN //= degs[j] + 1
mask = np.sum(remN, axis = 1) <= min(degs)
return remN[mask], mask
def homogeneizedpolyvander(x:paramList, deg:int, basis:str,
derIdxs : List[List[List[int]]] = None,
reorder : List[int] = None, scl : Np1D = None)\
-> Tuple[Np2D, List[List[int]], List[int]]:
x, _ = checkParameterList(x)
degs = [deg] * x.shape[1]
VanBase = polyvander(x, degs, basis, derIdxs, reorder, scl)
derIdxs, mask = homogeneizationMask(degs)
- # FIXME: improve computation of ordering indices
+
ordIdxs = np.empty(len(derIdxs), dtype = int)
- derOrdered = nextDerivativeIndices([], x.shape[1], len(derIdxs))
- for j, derOrd in enumerate(derOrdered):
- for k, dIdx in enumerate(derIdxs):
- if np.allclose(dIdx, derOrd):
- ordIdxs[j] = k
- break
+ derTotal = np.array([np.sum(y) for y in derIdxs])
+ idxPrev = 0
+ rangeAux = np.arange(len(derIdxs))
+ for j in range(deg + 1):
+ idxLocal = rangeAux[derTotal == j][::-1]
+ idxPrev += len(idxLocal)
+ ordIdxs[idxPrev - len(idxLocal) : idxPrev] = idxLocal
return VanBase[:, mask], derIdxs, ordIdxs
diff --git a/tests/test_3_reduction_methods_1D/rational_pade_1d.py b/tests/test_3_reduction_methods_1D/rational_pade_1d.py
index 79e189f..99e9aaf 100644
--- a/tests/test_3_reduction_methods_1D/rational_pade_1d.py
+++ b/tests/test_3_reduction_methods_1D/rational_pade_1d.py
@@ -1,86 +1,86 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
import os
import numpy as np
from matrix_fft import matrixFFT
from rrompy.reduction_methods.centered import RationalPade as RP
def test_rho():
mu = 1.5
mu0 = 2. + 1.j
solver = matrixFFT()
uh = solver.solve(mu)[0]
params = {"POD": False, "M": 9, "N": 10, "S": 11, "robustTol": 1e-6}
approx = RP(solver, mu0, params, verbosity = 0)
approx.setupApprox()
if not os.path.isdir("./.pytest_cache"): os.mkdir("./.pytest_cache")
filesOut = [x for x in os.listdir("./.pytest_cache") if
(x[-4:] == ".pkl" and x[:6] == "outPad")]
for fileOut in filesOut: os.remove("./.pytest_cache/" + fileOut)
fileStored = approx.storeTrainedModel(".pytest_cache/outPad")
filesOut = [x for x in os.listdir("./.pytest_cache") if
(x[-4:] == ".pkl" and x[:6] == "outPad")]
assert len(filesOut) == 1
assert filesOut[0] == fileStored[- len(filesOut[0]) :]
uhP1 = approx.getApprox(mu)[0]
errP = approx.getErr(mu)[0]
errNP = approx.normErr(mu)[0]
myerrP = uhP1 - uh
assert np.allclose(np.abs(errP - myerrP), 0., rtol = 1e-3)
assert np.isclose(solver.norm(errP), errNP, rtol = 1e-3)
resP = approx.getRes(mu)[0]
resNP = approx.normRes(mu)
assert np.isclose(solver.norm(resP), resNP, rtol = 1e-3)
assert np.allclose(np.abs(resP - (solver.b(mu) - solver.A(mu).dot(uhP1))),
0., rtol = 1e-3)
del approx
approx = RP(solver, mu0, {"S": 3}, verbosity = 0)
approx.loadTrainedModel(fileStored)
for fileOut in filesOut: os.remove("./.pytest_cache/" + fileOut)
uhP2 = approx.getApprox(mu)[0]
assert np.allclose(np.abs(uhP1 - uhP2), 0., rtol = 1e-3)
def test_E_warn(capsys):
mu = 1.5
mu0 = 2. + 1.j
solver = matrixFFT()
uh = solver.solve(mu)[0]
params = {"POD": True, "M": 14, "N": 15, "S": 10}
approx = RP(solver, mu0, params, verbosity = 0)
approx.setupApprox()
out, err = capsys.readouterr()
- assert "Prescribed S is too small. Decreasing M." in out
- assert "Prescribed S is too small. Decreasing N." in out
+ assert "Prescribed E is too small. Decreasing M." in out
+ assert "Prescribed E is too small. Decreasing N." in out
assert len(err) == 0
uhP = approx.getApprox(mu)[0]
errP = approx.getErr(mu)[0]
errNP = approx.normErr(mu)[0]
assert np.allclose(np.abs(errP - (uhP - uh)), 0., rtol = 1e-3)
assert np.isclose(errNP, 3.5197568e-07, rtol = 1e-1)
poles, ress = approx.getResidues()
condres = np.linalg.cond(solver.innerProduct(ress, ress))
assert np.isclose(condres, 192.1791778, rtol = 1e-3)
assert np.isclose(np.min(np.abs(poles - 2.)), 0., atol = 1e-3)
assert np.isclose(np.min(np.abs(poles - 1.)), 0., atol = 1e-2)
assert np.isclose(np.min(np.abs(poles - 3.)), 0., atol = 1e-2)
diff --git a/tests/test_4_reduction_methods_multiD/rational_interpolant_2d.py b/tests/test_4_reduction_methods_multiD/rational_interpolant_2d.py
index ed14275..4aef1c4 100644
--- a/tests/test_4_reduction_methods_multiD/rational_interpolant_2d.py
+++ b/tests/test_4_reduction_methods_multiD/rational_interpolant_2d.py
@@ -1,77 +1,77 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
import numpy as np
from matrix_random import matrixRandom
from rrompy.reduction_methods.distributed import RationalInterpolant as RI
from rrompy.parameter.parameter_sampling import (QuadratureSampler as QS,
ManualSampler as MS)
def test_monomials(capsys):
mu = [5.05, 7.1]
mu0 = [5., 7.]
solver = matrixRandom()
uh = solver.solve(mu)[0]
params = {"POD": False, "M": 4, "N": 4, "S": [4, 4], "robustTol": 1e-6,
"interpRcond": 1e-3, "polybasis": "MONOMIAL",
"sampler": QS([[4.9, 6.85], [5.1, 7.15]], "UNIFORM")}
approx = RI(solver, mu0, params, verbosity = 0)
approx.setupApprox()
uhP1 = approx.getApprox(mu)[0]
errP = approx.getErr(mu)[0]
errNP = approx.normErr(mu)[0]
myerrP = uhP1 - uh
assert np.allclose(np.abs(errP - myerrP), 0., rtol = 1e-3)
assert np.isclose(solver.norm(errP), errNP, rtol = 1e-3)
resP = approx.getRes(mu)[0]
resNP = approx.normRes(mu)
assert np.isclose(solver.norm(resP), resNP, rtol = 1e-3)
assert np.allclose(np.abs(resP - (solver.b(mu) - solver.A(mu).dot(uhP1))),
0., rtol = 1e-3)
assert np.isclose(errNP / solver.norm(uh), 7.537e-4, rtol = 1e-1)
out, err = capsys.readouterr()
assert ("poorly conditioned. Reducing M" in out)
assert len(err) == 0
def test_well_cond():
mu = [5.05, 7.1]
mu0 = [5., 7.]
solver = matrixRandom()
params = {"POD": True, "M": 3, "N": 3, "S": [4, 4],
"interpRcond": 1e-10, "polybasis": "CHEBYSHEV",
"sampler": QS([[4.9, 6.85], [5.1, 7.15]], "UNIFORM")}
approx = RI(solver, mu0, params, verbosity = 0)
approx.setupApprox()
print(approx.normErr(mu)[0] / approx.normHF(mu)[0])
assert np.isclose(approx.normErr(mu)[0] / approx.normHF(mu)[0],
8.46624e-3, rtol = 1e-1)
def test_hermite():
mu = [5.05, 7.1]
mu0 = [5., 7.]
solver = matrixRandom()
sampler0 = QS([[4.9, 6.85], [5.1, 7.15]], "UNIFORM")
params = {"POD": True, "M": 3, "N": 3, "S": [25], "polybasis": "CHEBYSHEV",
"sampler": MS([[4.9, 6.85], [5.1, 7.15]],
points = sampler0.generatePoints([3, 3]))}
approx = RI(solver, mu0, params, verbosity = 0)
approx.setupApprox()
assert np.isclose(approx.normErr(mu)[0] / approx.normHF(mu)[0],
- 1.449341e-4, rtol = 1e-1)
+ 2.36598e-4, rtol = 1e-1)
diff --git a/tests/test_4_reduction_methods_multiD/rational_pade_2d.py b/tests/test_4_reduction_methods_multiD/rational_pade_2d.py
index 1db8384..01ea5b7 100644
--- a/tests/test_4_reduction_methods_multiD/rational_pade_2d.py
+++ b/tests/test_4_reduction_methods_multiD/rational_pade_2d.py
@@ -1,64 +1,64 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
import numpy as np
from matrix_random import matrixRandom
from rrompy.reduction_methods.centered import RationalPade as RP
def test_rho():
mu = [5.5, 7.5]
mu0 = [5., 7.]
solver = matrixRandom()
uh = solver.solve(mu)[0]
params = {"POD": False, "M": 3, "N": 3, "S": 10, "robustTol": 1e-6}
approx = RP(solver, mu0, params, verbosity = 0)
approx.setupApprox()
uhP1 = approx.getApprox(mu)[0]
errP = approx.getErr(mu)[0]
errNP = approx.normErr(mu)[0]
myerrP = uhP1 - uh
assert np.allclose(np.abs(errP - myerrP), 0., rtol = 1e-3)
assert np.isclose(solver.norm(errP), errNP, rtol = 1e-3)
resP = approx.getRes(mu)[0]
resNP = approx.normRes(mu)
assert np.isclose(solver.norm(resP), resNP, rtol = 1e-3)
assert np.allclose(np.abs(resP - (solver.b(mu) - solver.A(mu).dot(uhP1))),
0., rtol = 1e-3)
def test_E_warn(capsys):
mu = [5.5, 7.5]
mu0 = [5., 7.]
solver = matrixRandom()
uh = solver.solve(mu)[0]
params = {"POD": True, "M": 3, "N": 4, "S": 10}
approx = RP(solver, mu0, params, verbosity = 0)
approx.setupApprox()
out, err = capsys.readouterr()
- assert "Prescribed S is too small. Decreasing M" not in out
- assert "Prescribed S is too small. Decreasing N" in out
+ assert "Prescribed E is too small. Decreasing M" not in out
+ assert "Prescribed E is too small. Decreasing N" in out
assert len(err) == 0
uhP = approx.getApprox(mu)[0]
uhNP = approx.normApprox(mu)[0]
errP = approx.getErr(mu)[0]
errNP = approx.normErr(mu)[0]
assert np.allclose(np.abs(errP - (uhP - uh)), 0., rtol = 1e-3)
assert np.isclose(errNP / uhNP, 1.41336e-2, rtol = 1e-1)