diff --git a/README.md b/README.md
index 70189c1..6f5b85e 100644
--- a/README.md
+++ b/README.md
@@ -1,43 +1,44 @@
# RROMPy -- Rational Reduced Order Modeling in Python
=====================================================
Module for the solution and rational model order reduction of parametric PDE-based problem. Coded in Python 3.6.
## Prerequisites
**RROMPy** requires
-* **numpy**;
-* **scipy**;
-* **fenics**.
+* **numpy** and **scipy**;
+* **fenics** and **mshr**;
+* **matplotlib**;
+* and other standard Python3 modules (**os**, **typing**, **time**, **datetime**, **abc**, **pickle**, **traceback**, and **itertools**).
Testing requires
* **pytest**.
### Fenics
-Most of the PDE engines already provided rely on [FEniCS](http://fenicsproject.org/). If you do not have FEniCS installed, you may want to create an [Anaconda3/Miniconda3](http://anaconda.org/) environment using the provided !!conda-fenics.yml!! environment file by running the command
+Most of the high fidelity problem engines already provided rely on [FEniCS](http://fenicsproject.org/). If you do not have FEniCS installed, you may want to create an [Anaconda3/Miniconda3](http://anaconda.org/) environment using the provided !!conda-fenics.yml!! environment file by running the command
```
conda env create --file conda-fenics.yml
```
-This will create an environment where Fenics can be used. In order to use FEniCS, the environment must be activated through
+This will create an environment where Fenics (and all other required modules) can be used. In order to use FEniCS, the environment must be activated through
```
source activate fenicsenv
```
## Installing
Clone the repository
```
git clone https://c4science.ch/source/RROMPy.git
```
enter the main folder and install the package by typing
```
python3 setup.py install
```
The installation can be tested with
```
python3 setup.py test
```
## License
This project is licensed under the GNU GENERAL PUBLIC LICENSE license - see the !!LICENSE!! file for details.
## Acknowledgments
Part of the funding that made this module possible has been provided by the Swiss National Science Foundation through the FNS Research Project No. 182236.
diff --git a/rrompy/hfengines/base/matrix_engine_base.py b/rrompy/hfengines/base/matrix_engine_base.py
index f213b9a..dfaad41 100644
--- a/rrompy/hfengines/base/matrix_engine_base.py
+++ b/rrompy/hfengines/base/matrix_engine_base.py
@@ -1,299 +1,324 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
import numpy as np
import scipy.sparse as scsp
from matplotlib import pyplot as plt
+from copy import deepcopy as copy
from rrompy.utilities.base.types import (Np1D, Np2D, ScOp, strLst, Tuple, List,
DictAny)
-from rrompy.utilities.base import purgeList, getNewFilename
+from rrompy.utilities.base import purgeList, getNewFilename, verbosityDepth
+from rrompy.utilities.exception_manager import RROMPyException
from rrompy.solver import setupSolver
__all__ = ['MatrixEngineBase']
class MatrixEngineBase:
"""
Generic solver for parametric matrix problems.
Attributes:
verbosity: Verbosity level.
As: Scipy sparse array representation (in CSC format) of As.
bs: Numpy array representation of bs.
bsH: Numpy array representation of homogeneized bs.
energyNormMatrix: Scipy sparse matrix representing inner product.
"""
nAs, nbs = 1, 1
rescalingExp = 1.
- functional = lambda self, u: 0.
def __init__(self, verbosity : int = 10, timestamp : bool = True):
self.verbosity = verbosity
self.timestamp = timestamp
self.resetAs()
self.resetbs()
self.setSolver("SPSOLVE", {"use_umfpack" : False})
def name(self) -> str:
return self.__class__.__name__
def __str__(self) -> str:
return self.name()
def __repr__(self) -> str:
return self.__str__() + " at " + hex(id(self))
def __dir_base__(self):
return [x for x in self.__dir__() if x[:2] != "__"]
@property
def nbsH(self) -> int:
return max(self.nbs, self.nAs)
def spacedim(self):
return self.As[0].shape[1]
def buildEnergyNormForm(self): # eye
"""
Build sparse matrix (in CSR format) representative of scalar product.
"""
self.energyNormMatrix = np.eye(self.spacedim())
def innerProduct(self, u:Np2D, v:Np2D, onlyDiag : bool = False) -> Np2D:
"""Scalar product."""
if not hasattr(self, "energyNormMatrix"):
if self.verbosity >= 20:
verbosityDepth("INIT", "Assembling energy matrix.",
timestamp = self.timestamp)
self.buildEnergyNormForm()
if self.verbosity >= 20:
verbosityDepth("DEL", "Done assembling energy matrix.",
timestamp = self.timestamp)
if onlyDiag:
return np.sum(self.energyNormMatrix.dot(u) * v.conj(), axis = 0)
return v.T.conj().dot(self.energyNormMatrix.dot(u))
def norm(self, u:Np2D) -> Np1D:
return np.abs(self.innerProduct(u, u, onlyDiag = True)) ** .5
def checkAInBounds(self, der : int = 0):
"""Check if derivative index is oob for operator of linear system."""
if der < 0 or der >= self.nAs:
d = self.spacedim()
return scsp.csr_matrix((np.zeros(0), np.zeros(0), np.zeros(d + 1)),
shape = (d, d), dtype = np.complex)
def checkbInBounds(self, der : int = 0, homogeneized : bool = False):
"""Check if derivative index is oob for RHS of linear system."""
nbs = self.nbsH if homogeneized else self.nbs
if der < 0 or der >= nbs:
return np.zeros(self.spacedim(), dtype = np.complex)
def resetAs(self):
"""Reset (derivatives of) operator of linear system."""
self.resetbsH()
- self.As = [None] * self.nAs
+ self.setAs([None] * self.nAs)
def resetbs(self):
"""Reset (derivatives of) RHS of linear system."""
self.resetbsH()
- self.bs = [None] * self.nbs
+ self.setbs([None] * self.nbs)
def resetbsH(self):
"""Reset (derivatives of) homogeneized RHS of linear system."""
- self.bsH = [None] * self.nbsH
+ self.setbsH([None] * self.nbsH)
+
+ def setAs(self, As:List[Np2D]):
+ """Assign terms of operator of linear system."""
+ if len(As) != self.nAs:
+ RROMPyException(("Expected number {} of terms of As not matching "
+ "given list length {}.").format(self.nAs,
+ len(As)))
+ self.As = list([copy(A) for A in As])
+
+ def setbs(self, bs:List[Np1D]):
+ """Assign terms of RHS of linear system."""
+ if len(bs) != self.nbs:
+ RROMPyException(("Expected number {} of terms of bs not matching "
+ "given list length {}.").format(self.nbs,
+ len(bs)))
+ self.bs = list([copy(b) for b in bs])
+
+ def setbsH(self, bsH:List[Np1D]):
+ """Assign terms of homogeneized RHS of linear system."""
+ if len(bsH) != self.nbsH:
+ RROMPyException(("Expected number {} of terms of bsH not matching "
+ "given list length {}.").format(self.nbsH,
+ len(bsH)))
+ self.bsH = list([copy(bH) for bH in bsH])
def A(self, mu:complex, der : int = 0) -> ScOp:
"""Return (derivative of) operator of linear system."""
Anull = self.checkAInBounds(der)
if Anull is not None: return Anull
As0 = self.As[der]
coeff = 1.
for j in range(der + 1, self.nAs):
coeff = coeff * mu * j / (j - der)
As0 = As0 + coeff * self.As[j]
return As0
def affineLinearSystemA(self, mu : complex = 0.) -> List[Np2D]:
"""
Assemble affine blocks of operator of linear system (just linear
blocks).
"""
As = [None] * self.nAs
for j in range(self.nAs):
As[j] = self.A(mu, j)
return As
def affineWeightsA(self, mu : complex = 0.) -> callable:
"""
Assemble affine blocks of operator of linear system (just affine
weights). Stored as strings for the sake of pickling.
"""
lambdasA = ["np.ones_like(mu)"]
mu0Eff = np.power(mu, self.rescalingExp)
for j in range(1, self.nAs):
lambdasA += ["np.power(np.power(mu, {1}) - {2}, {0})".format(j,
self.rescalingExp,
mu0Eff)]
return lambdasA
def affineBlocksA(self, mu : complex = 0.) -> Tuple[List[Np2D], callable]:
"""Assemble affine blocks of operator of linear system."""
return self.affineLinearSystemA(mu), self.affineWeightsA(mu)
def b(self, mu:complex, der : int = 0,
homogeneized : bool = False) -> Np1D:
"""Return (derivative of) (homogeneized) RHS of linear system."""
bnull = self.checkbInBounds(der, homogeneized)
if bnull is not None: return bnull
bs = self.bsH if homogeneized else self.bs
b = bs[der]
coeff = 1.
for j in range(der + 1, len(bs)):
coeff = coeff * mu * j / (j - der)
b = b + coeff * bs[j]
return b
def affineLinearSystemb(self, mu : complex = 0.,
homogeneized : bool = False) -> List[Np1D]:
"""
Assemble affine blocks of RHS of linear system (just linear blocks).
"""
nbs = self.nbsH if homogeneized else self.nbs
bs = [None] * nbs
for j in range(nbs):
bs[j] = self.b(mu, j, homogeneized)
return bs
def affineWeightsb(self, mu : complex = 0., homogeneized : bool = False)\
-> callable:
"""
Assemble affine blocks of RHS of linear system (just affine weights).
Stored as strings for the sake of pickling.
"""
nbs = self.nbsH if homogeneized else self.nbs
lambdasb = ["np.ones_like(mu)"]
mu0Eff = np.power(mu, self.rescalingExp)
for j in range(1, nbs):
lambdasb += ["np.power(np.power(mu, {1}) - {2}, {0})".format(j,
self.rescalingExp,
mu0Eff)]
return lambdasb
def affineBlocksb(self, mu : complex = 0., homogeneized : bool = False)\
-> Tuple[List[Np1D], callable]:
"""Assemble affine blocks of RHS of linear system."""
return (self.affineLinearSystemb(mu, homogeneized),
self.affineWeightsb(mu, homogeneized))
def setSolver(self, solverType:str, solverArgs : DictAny = {}):
"""Choose solver type and parameters."""
self._solver, self._solverArgs = setupSolver(solverType, solverArgs)
def solve(self, mu:complex, RHS : Np1D = None,
homogeneized : bool = False) -> Np1D:
"""
Find solution of linear system.
Args:
mu: parameter value.
RHS: RHS of linear system. If None, defaults to that of parametric
system. Defaults to None.
"""
A = self.A(mu)
if RHS is None: RHS = self.b(mu, homogeneized = homogeneized)
return self._solver(A, RHS, self._solverArgs)
def residual(self, u:Np1D, mu:complex,
homogeneized : bool = False) -> Np1D:
"""
Find residual of linear system for given approximate solution.
Args:
u: numpy complex array with function dofs. If None, set to 0.
mu: parameter value.
"""
A = self.A(mu)
RHS = self.b(mu, homogeneized = homogeneized)
if u is None: return RHS
return RHS - A.dot(u)
def plot(self, u:Np1D, name : str = "u", save : str = None,
what : strLst = 'all', saveFormat : str = "eps",
saveDPI : int = 100, show : bool = True, **figspecs):
"""
Do some nice plots of the complex-valued function with given dofs.
Args:
u: numpy complex array with function dofs.
name(optional): Name to be shown as title of the plots. Defaults to
'u'.
save(optional): Where to save plot(s). Defaults to None, i.e. no
saving.
what(optional): Which plots to do. If list, can contain 'ABS',
'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard 'ALL'.
Defaults to 'ALL'.
saveFormat(optional): Format for saved plot(s). Defaults to "eps".
saveDPI(optional): DPI for saved plot(s). Defaults to 100.
show(optional): Whether to show figure. Defaults to True.
figspecs(optional key args): Optional arguments for matplotlib
figure creation.
"""
if isinstance(what, (str,)):
if what.upper() == 'ALL':
what = ['ABS', 'PHASE', 'REAL', 'IMAG']
else:
what = [what]
what = purgeList(what, ['ABS', 'PHASE', 'REAL', 'IMAG'],
listname = self.name() + ".what", baselevel = 1)
if len(what) == 0: return
if 'figsize' not in figspecs.keys():
figspecs['figsize'] = (13. * len(what) / 4, 3)
subplotcode = 100 + len(what) * 10
idxs = np.arange(self.spacedim())
plt.figure(**figspecs)
plt.jet()
if 'ABS' in what:
subplotcode = subplotcode + 1
plt.subplot(subplotcode)
plt.plot(idxs, np.abs(u))
plt.title("|{0}|".format(name))
if 'PHASE' in what:
subplotcode = subplotcode + 1
plt.subplot(subplotcode)
plt.plot(idxs, np.angle(u))
plt.title("phase({0})".format(name))
if 'REAL' in what:
subplotcode = subplotcode + 1
plt.subplot(subplotcode)
plt.plot(idxs, np.real(u))
plt.title("Re({0})".format(name))
if 'IMAG' in what:
subplotcode = subplotcode + 1
plt.subplot(subplotcode)
plt.plot(idxs, np.imag(u))
plt.title("Im({0})".format(name))
if save is not None:
save = save.strip()
plt.savefig(getNewFilename("{}_fig_".format(save), saveFormat),
format = saveFormat, dpi = saveDPI)
if show:
plt.show()
plt.close()
diff --git a/rrompy/hfengines/base/vector_problem_engine_base.py b/rrompy/hfengines/base/vector_problem_engine_base.py
index 7cc9a43..ab8e70c 100644
--- a/rrompy/hfengines/base/vector_problem_engine_base.py
+++ b/rrompy/hfengines/base/vector_problem_engine_base.py
@@ -1,201 +1,200 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
import fenics as fen
import numpy as np
from matplotlib import pyplot as plt
from rrompy.utilities.base.types import Np1D, strLst
from rrompy.utilities.base import purgeList, getNewFilename
from .problem_engine_base import ProblemEngineBase
__all__ = ['VectorProblemEngineBase']
class VectorProblemEngineBase(ProblemEngineBase):
"""
Generic solver for parametric vector problems.
Attributes:
verbosity: Verbosity level.
BCManager: Boundary condition manager.
V: Real FE space.
u: Generic trial functions for variational form evaluation.
v: Generic test functions for variational form evaluation.
As: Scipy sparse array representation (in CSC format) of As.
bs: Numpy array representation of bs.
energyNormMatrix: Scipy sparse matrix representing inner product over
V.
bsmu: Mu value of last bs evaluation.
liftDirichletDatamu: Mu value of last Dirichlet datum evaluation.
liftedDirichletDatum: Dofs of Dirichlet datum lifting.
mu0BC: Mu value of last Dirichlet datum lifting.
degree_threshold: Threshold for ufl expression interpolation degree.
"""
nAs, nbs = 1, 1
- functional = lambda self, u: 0.
def __init__(self, degree_threshold : int = np.inf, verbosity : int = 10,
timestamp : bool = True):
super().__init__(degree_threshold = degree_threshold,
verbosity = verbosity, timestamp = timestamp)
self.V = fen.VectorFunctionSpace(fen.UnitSquareMesh(10, 10), "P", 1)
def plot(self, u:Np1D, name : str = "u", save : str = None,
what : strLst = 'all', saveFormat : str = "eps",
saveDPI : int = 100, show : bool = True, **figspecs):
"""
Do some nice plots of the complex-valued function with given dofs.
Args:
u: numpy complex array with function dofs.
name(optional): Name to be shown as title of the plots. Defaults to
'u'.
save(optional): Where to save plot(s). Defaults to None, i.e. no
saving.
what(optional): Which plots to do. If list, can contain 'ABS',
'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard 'ALL'.
Defaults to 'ALL'.
saveFormat(optional): Format for saved plot(s). Defaults to "eps".
saveDPI(optional): DPI for saved plot(s). Defaults to.
show(optional): Whether to show figure. Defaults to True.
figspecs(optional key args): Optional arguments for matplotlib
figure creation.
"""
if isinstance(what, (str,)):
if what.upper() == 'ALL':
what = ['ABS', 'PHASE', 'REAL', 'IMAG']
else:
what = [what]
what = purgeList(what, ['ABS', 'PHASE', 'REAL', 'IMAG'],
listname = self.name() + ".what", baselevel = 1)
if 'figsize' not in figspecs.keys():
figspecs['figsize'] = (13. * max(len(what), 1) / 4, 3)
if len(what) > 0:
for j in range(self.V.num_sub_spaces()):
subplotcode = 100 + len(what) * 10
II = self.V.sub(j).dofmap().dofs()
Vj = self.V.sub(j).collapse()
plt.figure(**figspecs)
plt.jet()
if 'ABS' in what:
uAb = fen.Function(Vj)
uAb.vector().set_local(np.abs(u[II]))
subplotcode = subplotcode + 1
plt.subplot(subplotcode)
p = fen.plot(uAb, title = "|{}_comp{}|".format(name, j))
plt.colorbar(p)
if 'PHASE' in what:
uPh = fen.Function(Vj)
uPh.vector().set_local(np.angle(u[II]))
subplotcode = subplotcode + 1
plt.subplot(subplotcode)
p = fen.plot(uPh, title = "phase({}_comp{})".format(name,
j))
plt.colorbar(p)
if 'REAL' in what:
uRe = fen.Function(Vj)
uRe.vector().set_local(np.real(u[II]))
subplotcode = subplotcode + 1
plt.subplot(subplotcode)
p = fen.plot(uRe, title = "Re({}_comp{})".format(name, j))
plt.colorbar(p)
if 'IMAG' in what:
uIm = fen.Function(Vj)
uIm.vector().set_local(np.imag(u[II]))
subplotcode = subplotcode + 1
plt.subplot(subplotcode)
p = fen.plot(uIm, title = "Im({}_comp{})".format(name, j))
plt.colorbar(p)
if save is not None:
save = save.strip()
plt.savefig(getNewFilename("{}_comp{}_fig_".format(save, j),
saveFormat),
format = saveFormat, dpi = saveDPI)
if show:
plt.show()
plt.close()
try:
if len(what) > 1:
figspecs['figsize'] = (2. / len(what) * figspecs['figsize'][0],
figspecs['figsize'][1])
elif len(what) == 0:
figspecs['figsize'] = (2. * figspecs['figsize'][0],
figspecs['figsize'][1])
if len(what) == 0 or 'ABS' in what or 'REAL' in what:
uVRe = fen.Function(self.V)
uVRe.vector().set_local(np.real(u))
plt.figure(**figspecs)
plt.jet()
p = fen.plot(uVRe, title = "{}_Re".format(name),
mode = "displacement")
plt.colorbar(p)
if save is not None:
save = save.strip()
plt.savefig(getNewFilename("{}_disp_Re_fig_".format(save),
saveFormat),
format = saveFormat, dpi = saveDPI)
plt.show()
plt.close()
if 'ABS' in what or 'IMAG' in what:
uVIm = fen.Function(self.V)
uVIm.vector().set_local(np.imag(u))
plt.figure(**figspecs)
plt.jet()
p = fen.plot(uVIm, title = "{}_Im".format(name),
mode = "displacement")
plt.colorbar(p)
if save is not None:
save = save.strip()
plt.savefig(getNewFilename("{}_disp_Im_fig_".format(save, j),
saveFormat),
format = saveFormat, dpi = saveDPI)
if show:
plt.show()
plt.close()
except:
pass
def plotmesh(self, name : str = "Mesh", save : str = None,
saveFormat : str = "eps", saveDPI : int = 100,
show : bool = True, **figspecs):
"""
Do a nice plot of the mesh.
Args:
u: numpy complex array with function dofs.
name(optional): Name to be shown as title of the plots. Defaults to
'u'.
save(optional): Where to save plot(s). Defaults to None, i.e. no
saving.
saveFormat(optional): Format for saved plot(s). Defaults to "eps".
saveDPI(optional): DPI for saved plot(s). Defaults to 100.
show(optional): Whether to show figure. Defaults to True.
figspecs(optional key args): Optional arguments for matplotlib
figure creation.
"""
plt.figure(**figspecs)
fen.plot(self.V.mesh())
if save is not None:
save = save.strip()
plt.savefig(getNewFilename("{}_msh_".format(save), saveFormat),
format = saveFormat, dpi = saveDPI)
if show:
plt.show()
plt.close()
diff --git a/rrompy/hfengines/linear_problem/helmholtz_square_bubble_domain_problem_engine.py b/rrompy/hfengines/linear_problem/helmholtz_square_bubble_domain_problem_engine.py
index 9d00525..679bc9c 100644
--- a/rrompy/hfengines/linear_problem/helmholtz_square_bubble_domain_problem_engine.py
+++ b/rrompy/hfengines/linear_problem/helmholtz_square_bubble_domain_problem_engine.py
@@ -1,245 +1,245 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
import numpy as np
import scipy.sparse as scsp
import fenics as fen
from rrompy.utilities.base.types import Np1D, ScOp, Tuple, FenExpr
from rrompy.utilities.fenics import fenZERO
from .helmholtz_problem_engine import HelmholtzProblemEngine
from rrompy.utilities.base import verbosityDepth
__all__ = ['HelmholtzSquareBubbleDomainProblemEngine']
class HelmholtzSquareBubbleDomainProblemEngine(HelmholtzProblemEngine):
"""
Solver for square bubble Helmholtz problems with parametric domain heigth.
- \Delta u - kappa^2 * u = f in \Omega_mu = [0,\pi] x [0,\mu\pi]
u = 0 on \Gamma_mu = \partial\Omega_mu
with exact solution square bubble times plane wave.
"""
nAs, nbs = 3, 20
rescalingExp = 1.
def __init__(self, kappa:float, theta:float, n:int, mu0 : np.complex = 1.,
degree_threshold : int = np.inf, verbosity : int = 10,
timestamp : bool = True):
super().__init__(degree_threshold = degree_threshold,
verbosity = verbosity, timestamp = timestamp)
self.omega = kappa
self.kappa = kappa
self.theta = theta
self.mu0 = mu0
self.forcingTermMu = np.nan
mesh = fen.RectangleMesh(fen.Point(0,0), fen.Point(np.pi,np.pi), n, n)
self.V = fen.FunctionSpace(mesh, "P", 3)
def buildEnergyNormForm(self): # H1
"""
Build sparse matrix (in CSR format) representative of scalar product.
"""
mudx = np.abs(self.mu0) * fen.dot(self.u.dx(0), self.v.dx(0)) * fen.dx
muM = np.abs(self.mu0) * fen.dot(self.u, self.v) * fen.dx
imudy = 1. / np.abs(self.mu0) * (fen.dot(self.u.dx(1), self.v.dx(1))
* fen.dx)
normMatFen = fen.assemble(mudx + imudy + muM)
normMat = fen.as_backend_type(normMatFen).mat()
self.energyNormMatrix = scsp.csr_matrix(normMat.getValuesCSR()[::-1],
shape = normMat.size)
def getForcingTerm(self, mu:complex) -> Tuple[FenExpr, FenExpr]:
"""Compute forcing term."""
if not np.isclose(mu, self.forcingTermMu):
if self.verbosity >= 25:
verbosityDepth("INIT", ("Assembling base expression for "
"forcing term."),
timestamp = self.timestamp)
pi = np.pi
c, s = np.cos(self.theta), np.sin(self.theta)
x, y = fen.SpatialCoordinate(self.V.mesh())[:]
muR, muI = np.real(mu), np.imag(mu)
mu2R, mu2I = np.real(mu ** 2.), np.imag(mu ** 2.)
C = 16. / pi ** 4.
bR = C * (2 * (x * (pi - x) + y * (pi - y))
+ (self.kappa * s) ** 2. * (mu2R - 1.)
* x * (pi - x) * y * (pi - y))
bI = C * (2 * self.kappa * (c * (pi - 2 * x) * y * (pi - y)
+ s * x * (pi - x) * (pi - 2 * y))
+ (self.kappa * s) ** 2. * mu2I
* x * (pi - x) * y * (pi - y))
wR = (fen.cos(self.kappa * (c * x + s * muR * y))
* fen.exp(self.kappa * s * muI * y))
wI = (fen.sin(self.kappa * (c * x + s * muR * y))
* fen.exp(self.kappa * s * muI * y))
self.forcingTerm = [bR * wR + bI * wI, bI * wR - bR * wI]
self.forcingTermMu = mu
if self.verbosity >= 25:
verbosityDepth("DEL", "Done assembling base expression.",
timestamp = self.timestamp)
return self.forcingTerm
def getExtraFactorB(self, mu:complex, der:int) -> Tuple[FenExpr, FenExpr]:
"""Compute extra expression in RHS."""
def getPowMinusj(x, power):
powR = x ** power
powI = fenZERO
if power % 2 == 1:
powR, powI = powI, powR
if (power + 3) % 4 < 2:
powR, powI = - powR, - powI
return powR, powI
if self.verbosity >= 25:
verbosityDepth("INIT", ("Assembling auxiliary expression for "
"forcing term derivative."),
timestamp = self.timestamp)
- from math import factorial as fact
+ from scipy.special import factorial as fact
y = fen.SpatialCoordinate(self.V.mesh())[1]
powR, powI = [(self.kappa * np.sin(self.theta)) ** der * k\
for k in getPowMinusj(y, der)]
mu2R, mu2I = np.real(mu ** 2.), np.imag(mu ** 2.)
exprR = mu2R * powR - mu2I * powI
exprI = mu2I * powR + mu2R * powI
if der >= 1:
muR, muI = np.real(2. * mu), np.imag(2. * mu)
powR, powI = [(self.kappa * np.sin(self.theta)) ** (der - 1) * k\
* der for k in getPowMinusj(y, der - 1)]
exprR += muR * powR - muI * powI
exprI += muI * powR + muR * powI
if der >= 2:
powR, powI = [(self.kappa * np.sin(self.theta)) ** (der - 2) * k\
* der * (der - 1) for k in getPowMinusj(y, der - 2)]
exprR += powR
exprI += powI
fac = fact(der)
if self.verbosity >= 25:
verbosityDepth("DEL", "Done assembling auxiliary expression.",
timestamp = self.timestamp)
return [exprR / fac, exprI / fac]
def A(self, mu:complex, der : int = 0) -> ScOp:
"""Assemble (derivative of) operator of linear system."""
Anull = self.checkAInBounds(der)
if Anull is not None: return Anull
self.autoSetDS()
if der <= 0 and self.As[0] is None:
if self.verbosity >= 20:
verbosityDepth("INIT", "Assembling operator term A0.",
timestamp = self.timestamp)
DirichletBC0 = fen.DirichletBC(self.V, fenZERO,
self.DirichletBoundary)
a0Re = fen.dot(self.u.dx(1), self.v.dx(1)) * fen.dx
A0Re = fen.assemble(a0Re)
DirichletBC0.apply(A0Re)
A0ReMat = fen.as_backend_type(A0Re).mat()
A0Rer, A0Rec, A0Rev = A0ReMat.getValuesCSR()
self.As[0] = scsp.csr_matrix((A0Rev, A0Rec, A0Rer),
shape = A0ReMat.size,
dtype = np.complex)
if self.verbosity >= 20:
verbosityDepth("DEL", "Done assembling operator term.",
timestamp = self.timestamp)
if der <= 2 and self.As[2] is None:
if self.verbosity >= 20:
verbosityDepth("INIT", "Assembling operator term A2.",
timestamp = self.timestamp)
DirichletBC0 = fen.DirichletBC(self.V, fenZERO,
self.DirichletBoundary)
nRe, nIm = self.refractionIndex
n2Re, n2Im = nRe * nRe - nIm * nIm, 2 * nRe * nIm
k2Re, k2Im = np.real(self.omega ** 2), np.imag(self.omega ** 2)
k2n2Re = k2Re * n2Re - k2Im * n2Im
k2n2Im = k2Re * n2Im + k2Im * n2Re
parsRe = self.iterReduceQuadratureDegree(zip([k2n2Re],
["kappaSquaredRefractionIndexSquaredReal"]))
parsIm = self.iterReduceQuadratureDegree(zip([k2n2Im],
["kappaSquaredRefractionIndexSquaredImag"]))
a2Re = (fen.dot(self.u.dx(0), self.v.dx(0))
- k2n2Re * fen.dot(self.u, self.v)) * fen.dx
a2Im = - k2n2Im * fen.dot(self.u, self.v) * fen.dx
A2Re = fen.assemble(a2Re, form_compiler_parameters = parsRe)
A2Im = fen.assemble(a2Im, form_compiler_parameters = parsIm)
DirichletBC0.zero(A2Re)
DirichletBC0.zero(A2Im)
A2ReMat = fen.as_backend_type(A2Re).mat()
A2ImMat = fen.as_backend_type(A2Im).mat()
A2Rer, A2Rec, A2Rev = A2ReMat.getValuesCSR()
A2Imr, A2Imc, A2Imv = A2ImMat.getValuesCSR()
self.As[2] = (scsp.csr_matrix((A2Rev, A2Rec, A2Rer),
shape = A2ReMat.size)
+ 1.j * scsp.csr_matrix((A2Imv, A2Imc, A2Imr),
shape = A2ImMat.size))
if self.verbosity >= 20:
verbosityDepth("DEL", "Done assembling operator term.",
timestamp = self.timestamp)
if der == 0:
return self.As[0] + mu ** 2 * self.As[2]
if der == 1:
return 2. * mu * self.As[2]
return self.As[2]
def b(self, mu:complex, der : int = 0,
homogeneized : bool = False) -> Np1D:
"""Assemble (derivative of) RHS of linear system."""
bnull = self.checkbInBounds(der, homogeneized)
if bnull is not None: return bnull
if homogeneized and not np.isclose(self.mu0BC, mu):
self.u0BC = self.liftDirichletData(mu)
if not np.isclose(self.bsmu, mu):
self.bsmu = mu
self.resetbs()
b = self.bsH[der] if homogeneized else self.bs[der]
if b is None:
if self.verbosity >= 20:
verbosityDepth("INIT", ("Assembling forcing term "
"b{}.").format(der),
timestamp = self.timestamp)
if der < self.nbs:
fRe, fIm = self.getForcingTerm(mu)
cRe, cIm = self.getExtraFactorB(mu, der)
cfRe = cRe * fRe - cIm * fIm
cfIm = cRe * fIm + cIm * fRe
else:
cfRe, cfIm = fenZERO, fenZERO
parsRe = self.iterReduceQuadratureDegree(zip([cfRe],
["forcingTermDer{}Real".format(der)]))
parsIm = self.iterReduceQuadratureDegree(zip([cfIm],
["forcingTermDer{}Imag".format(der)]))
L0Re = fen.dot(cfRe, self.v) * fen.dx
L0Im = fen.dot(cfIm, self.v) * fen.dx
b0Re = fen.assemble(L0Re, form_compiler_parameters = parsRe)
b0Im = fen.assemble(L0Im, form_compiler_parameters = parsIm)
if homogeneized:
Ader = self.A(mu, der)
b0Re[:] -= np.real(Ader.dot(self.u0BC))
b0Im[:] -= np.imag(Ader.dot(self.u0BC))
DirichletBC0 = fen.DirichletBC(self.V, fenZERO,
self.DirichletBoundary)
DirichletBC0.apply(b0Re)
DirichletBC0.apply(b0Im)
b = np.array(b0Re[:] + 1.j * b0Im[:], dtype = np.complex)
if homogeneized:
self.bsH[der] = b
else:
self.bs[der] = b
if self.verbosity >= 20:
verbosityDepth("DEL", "Done assembling forcing term.",
timestamp = self.timestamp)
return b
diff --git a/rrompy/reduction_methods/base/generic_approximant.py b/rrompy/reduction_methods/base/generic_approximant.py
index 2b43bb4..0f68452 100644
--- a/rrompy/reduction_methods/base/generic_approximant.py
+++ b/rrompy/reduction_methods/base/generic_approximant.py
@@ -1,617 +1,638 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
from abc import abstractmethod
-import pickle
import numpy as np
from itertools import product as iterprod
from copy import deepcopy as copy
+from os import remove as osrm
from rrompy.sampling.base.sampling_engine_base import SamplingEngineBase
from rrompy.utilities.base.types import Np1D, DictAny, HFEng, sampleEng, strLst
from rrompy.utilities.base import purgeDict, verbosityDepth, getNewFilename
from rrompy.utilities.exception_manager import (RROMPyException, modeAssert,
RROMPy_READY, RROMPy_FRAGILE)
+from rrompy.utilities.base import pickleDump, pickleLoad
__all__ = ['GenericApproximant']
def addNormFieldToClass(self, fieldName):
def objFunc(self, mu:complex, homogeneized : bool = False) -> float:
getObj = getattr(self.__class__, "get" + fieldName)
return self.HFEngine.norm(getObj(self, mu, homogeneized))
setattr(self.__class__, "norm" + fieldName, objFunc)
def addPlotFieldToClass(self, fieldName):
def objFunc(self, mu:complex, name : str = fieldName, save : str = None,
what : strLst = 'all', saveFormat : str = "eps",
saveDPI : int = 100, show : bool = True,
homogeneized : bool = False, **figspecs):
uV = getattr(self.__class__, "get" + fieldName)(self, mu, homogeneized)
self.HFEngine.plot(uV, name = name, save = save, what = what,
saveFormat = saveFormat, saveDPI = saveDPI,
show = show, **figspecs)
setattr(self.__class__, "plot" + fieldName, objFunc)
def addOutParaviewFieldToClass(self, fieldName):
def objFunc(self, mu:complex, name : str = fieldName,
filename : str = "out", time : float = 0.,
what : strLst = 'all', forceNewFile : bool = True,
folder : bool = False, filePW = None,
homogeneized : bool = False):
uV = getattr(self.__class__, "get" + fieldName)(self, mu, homogeneized)
self.HFEngine.outParaview(uV, name = name, filename = filename,
time = time, what = what,
forceNewFile = forceNewFile,
folder = folder, filePW = filePW)
setattr(self.__class__, "outParaview" + fieldName, objFunc)
def addOutParaviewTimeDomainFieldToClass(self, fieldName):
def objFunc(self, mu:complex, omega : float = None,
timeFinal : float = None, periodResolution : int = 20,
name : str = fieldName, filename : str = "out",
forceNewFile : bool = True, folder : bool = False,
homogeneized : bool = False):
uV = getattr(self.__class__, "get" + fieldName)(self, mu, homogeneized)
if omega is None: omega = np.real(mu)
self.HFEngine.outParaviewTimeDomain(uV, omega = omega,
timeFinal = timeFinal,
periodResolution = periodResolution,
name = name, filename = filename,
forceNewFile = forceNewFile,
folder = folder)
setattr(self.__class__, "outParaviewTimeDomain" + fieldName, objFunc)
class GenericApproximant:
"""
ABSTRACT
ROM approximant computation for parametric problems.
Args:
HFEngine: HF problem solver.
mu0(optional): Default parameter. Defaults to 0.
approxParameters(optional): Dictionary containing values for main
parameters of approximant. Recognized keys are:
- 'POD': whether to compute POD of snapshots; defaults to True.
Defaults to empty dict.
homogeneized(optional): Whether to homogeneize Dirichlet BCs. Defaults
to False.
verbosity(optional): Verbosity level. Defaults to 10.
Attributes:
HFEngine: HF problem solver.
trainedModel: Trained model evaluator.
mu0: Default parameter.
homogeneized: Whether to homogeneize Dirichlet BCs.
approxParameters: Dictionary containing values for main parameters of
approximant. Recognized keys are in parameterList.
parameterList: Recognized keys of approximant parameters:
- 'POD': whether to compute POD of snapshots.
verbosity: Verbosity level.
POD: Whether to compute POD of snapshots.
samplingEngine: Sampling engine.
uHF: High fidelity solution with wavenumber lastSolvedHF as numpy
complex vector.
lastSolvedHF: Wavenumber corresponding to last computed high fidelity
solution.
uApp: Last evaluated approximant as numpy complex vector.
"""
__all__ += [ftype + dtype for ftype, dtype in iterprod(
["norm", "plot", "outParaview", "outParaviewTimeDomain"],
["HF", "RHS", "Approx", "Res", "Err"])]
def __init__(self, HFEngine:HFEng, mu0 : complex = 0,
approxParameters : DictAny = {}, homogeneized : bool = False,
verbosity : int = 10, timestamp : bool = True):
self._preInit()
self._mode = RROMPy_READY
self.verbosity = verbosity
self.timestamp = timestamp
if self.verbosity >= 10:
verbosityDepth("INIT", ("Initializing approximant engine of "
"type {}.").format(self.name()),
timestamp = self.timestamp)
self.HFEngine = HFEngine
self._addParametersToList(["POD"])
self.mu0 = mu0
self.homogeneized = homogeneized
self.approxParameters = approxParameters
self._postInit()
### add norm{HF,RHS,Approx,Res,Err} methods
"""
Compute norm of * at arbitrary parameter.
Args:
mu: Target parameter.
homogeneized(optional): Whether to remove Dirichlet BC. Defaults to
False.
Returns:
Target norm of *.
"""
for objName in ["HF", "RHS", "Approx", "Res", "Err"]:
addNormFieldToClass(self, objName)
### add plot{HF,RHS,Approx,Res,Err} methods
"""
Do some nice plots of * at arbitrary parameter.
Args:
mu: Target parameter.
name(optional): Name to be shown as title of the plots. Defaults to
'u'.
what(optional): Which plots to do. If list, can contain 'ABS',
'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard 'ALL'.
Defaults to 'ALL'.
save(optional): Where to save plot(s). Defaults to None, i.e. no
saving.
saveFormat(optional): Format for saved plot(s). Defaults to "eps".
saveDPI(optional): DPI for saved plot(s). Defaults to 100.
show(optional): Whether to show figure. Defaults to True.
homogeneized(optional): Whether to remove Dirichlet BC. Defaults to
False.
figspecs(optional key args): Optional arguments for matplotlib
figure creation.
"""
for objName in ["HF", "RHS", "Approx", "Res", "Err"]:
addPlotFieldToClass(self, objName)
### add outParaview{HF,RHS,Approx,Res,Err} methods
"""
Output * to ParaView file.
Args:
mu: Target parameter.
name(optional): Base name to be used for data output.
filename(optional): Name of output file.
time(optional): Timestamp.
what(optional): Which plots to do. If list, can contain 'MESH',
'ABS', 'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard
'ALL'. Defaults to 'ALL'.
forceNewFile(optional): Whether to create new output file.
filePW(optional): Fenics File entity (for time series).
homogeneized(optional): Whether to remove Dirichlet BC. Defaults to
False.
"""
for objName in ["HF", "RHS", "Approx", "Res", "Err"]:
addOutParaviewFieldToClass(self, objName)
### add outParaviewTimeDomain{HF,RHS,Approx,Res,Err} methods
"""
Output * to ParaView file, converted to time domain.
Args:
mu: Target parameter.
omega(optional): frequency.
timeFinal(optional): final time of simulation.
periodResolution(optional): number of time steps per period.
name(optional): Base name to be used for data output.
filename(optional): Name of output file.
forceNewFile(optional): Whether to create new output file.
homogeneized(optional): Whether to remove Dirichlet BC. Defaults to
False.
"""
for objName in ["HF", "RHS", "Approx", "Res", "Err"]:
addOutParaviewTimeDomainFieldToClass(self, objName)
def _preInit(self):
if not hasattr(self, "depth"): self.depth = 0
else: self.depth += 1
def _addParametersToList(self, what:strLst):
if not hasattr(self, "parameterList"):
self.parameterList = []
self.parameterList += what
def _postInit(self):
if self.depth == 0:
if self.verbosity >= 10:
verbosityDepth("DEL", "Done initializing.",
timestamp = self.timestamp)
del self.depth
else: self.depth -= 1
def name(self) -> str:
return self.__class__.__name__
def __str__(self) -> str:
return self.name()
def __repr__(self) -> str:
return self.__str__() + " at " + hex(id(self))
def setupSampling(self, SamplingEngine : sampleEng = SamplingEngineBase):
"""Setup sampling engine."""
modeAssert(self._mode, message = "Cannot setup sampling engine.")
self.samplingEngine = SamplingEngine(self.HFEngine,
verbosity = self.verbosity)
@property
def mu0(self):
"""Value of mu0."""
return self._mu0
@mu0.setter
def mu0(self, mu0):
if not (hasattr(self, "_mu0") and np.isclose(mu0, self.mu0)):
self.resetSamples()
self._mu0 = mu0
@property
def approxParameters(self):
"""Value of approximant parameters."""
return self._approxParameters
@approxParameters.setter
def approxParameters(self, approxParams):
if not hasattr(self, "approxParameters"):
self._approxParameters = {}
approxParameters = purgeDict(approxParams, self.parameterList,
dictname = self.name() + ".approxParameters",
baselevel = 1)
keyList = list(approxParameters.keys())
if "POD" in keyList:
self.POD = approxParameters["POD"]
elif not hasattr(self, "_POD") or self._POD is None:
self.POD = True
@property
def POD(self):
"""Value of POD."""
return self._POD
@POD.setter
def POD(self, POD):
if hasattr(self, "_POD"): PODold = self.POD
else: PODold = -1
self._POD = POD
self._approxParameters["POD"] = self.POD
if PODold != self.POD:
self.samplingEngine = None
self.resetSamples()
@property
def homogeneized(self):
"""Value of homogeneized."""
return self._homogeneized
@homogeneized.setter
def homogeneized(self, homogeneized):
if not hasattr(self, "_homogeneized"):
self._homogeneized = None
if homogeneized != self.homogeneized:
self._homogeneized = homogeneized
self.resetSamples()
+ def setHF(self, muHF:complex, uHF:Np1D):
+ """Assign high fidelity solution."""
+ self.lastSolvedHF = copy(muHF)
+ self.uHF = copy(uHF)
+
def solveHF(self, mu : complex = None):
"""
Find high fidelity solution with original parameters and arbitrary
parameter.
Args:
mu: Target parameter.
"""
if mu is None: mu = self.mu0
- if (not hasattr(self, "lastSolvedHF")
- or not np.isclose(self.lastSolvedHF, mu)):
- self.uHF = self.samplingEngine.solveLS(mu,
- homogeneized = self.homogeneized)
- self.lastSolvedHF = mu
+ if not np.isclose(self.lastSolvedHF, mu):
+ self.setHF(mu, self.samplingEngine.solveLS(mu,
+ homogeneized = self.homogeneized))
def resetSamples(self):
"""Reset samples."""
if hasattr(self, "samplingEngine") and self.samplingEngine is not None:
self.samplingEngine.resetHistory()
else:
self.setupSampling()
self.trainedModel = None
+ self.setHF(np.nan, None)
self._mode = RROMPy_READY
def plotSamples(self, name : str = "u", save : str = None,
what : strLst = 'all', saveFormat : str = "eps",
saveDPI : int = 100, **figspecs):
"""
Do some nice plots of the samples.
Args:
name(optional): Name to be shown as title of the plots. Defaults to
'u'.
what(optional): Which plots to do. If list, can contain 'ABS',
'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard 'ALL'.
Defaults to 'ALL'.
save(optional): Where to save plot(s). Defaults to None, i.e. no
saving.
saveFormat(optional): Format for saved plot(s). Defaults to "eps".
saveDPI(optional): DPI for saved plot(s). Defaults to 100.
figspecs(optional key args): Optional arguments for matplotlib
figure creation.
"""
modeAssert(self._mode, message = "Cannot plot samples.")
self.samplingEngine.plotSamples(name = name, save = save, what = what,
saveFormat = saveFormat,
saveDPI = saveDPI,
**figspecs)
def outParaviewSamples(self, name : str = "u", filename : str = "out",
times : Np1D = None, what : strLst = 'all',
forceNewFile : bool = True, folders : bool = False,
filePW = None):
"""
Output samples to ParaView file.
Args:
name(optional): Base name to be used for data output.
filename(optional): Name of output file.
times(optional): Timestamps.
what(optional): Which plots to do. If list, can contain 'MESH',
'ABS', 'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard
'ALL'. Defaults to 'ALL'.
forceNewFile(optional): Whether to create new output file.
folders(optional): Whether to split output in folders.
filePW(optional): Fenics File entity (for time series).
"""
modeAssert(self._mode, message = "Cannot output samples.")
self.samplingEngine.outParaviewSamples(name = name,
filename = filename,
times = times, what = what,
forceNewFile = forceNewFile,
folders = folders,
filePW = filePW)
def outParaviewTimeDomainSamples(self, omegas : Np1D = None,
timeFinal : Np1D = None,
periodResolution : int = 20,
name : str = "u",
filename : str = "out",
forceNewFile : bool = True,
folders : bool = False):
"""
Output samples to ParaView file, converted to time domain.
Args:
omegas(optional): frequencies.
timeFinal(optional): final time of simulation.
periodResolution(optional): number of time steps per period.
name(optional): Base name to be used for data output.
filename(optional): Name of output file.
forceNewFile(optional): Whether to create new output file.
folders(optional): Whether to split output in folders.
"""
modeAssert(self._mode, message = "Cannot output samples.")
self.samplingEngine.outParaviewTimeDomainSamples(omegas = omegas,
timeFinal = timeFinal,
periodResolution = periodResolution,
name = name, filename = filename,
forceNewFile = forceNewFile,
folders = folders)
+ def setApprox(self, model):
+ """Deepcopy approximation from trained model."""
+ if hasattr(model, "storeTrainedModel"):
+ verb = model.verbosity
+ model.verbosity = 0
+ fileOut = model.storeTrainedModel()
+ model.verbosity = verb
+ else:
+ try:
+ fileOut = getNewFilename("trained_model", "pkl")
+ pickleDump(model.data.__dict__, fileOut)
+ except:
+ RROMPyException(("Failed to store model data. Parameter model "
+ "must have either storeTrainedModel or "
+ "data.__dict__ properties."))
+ self.loadTrainedModel(fileOut)
+ osrm(fileOut)
+
@abstractmethod
def setupApprox(self):
"""
Setup approximant. (ABSTRACT)
Any specialization should include something like
if self.checkComputedApprox():
return
modeAssert(self._mode, message = "Cannot setup approximant.")
...
self.trainedModel = ...
self.trainedModel.data = ...
self.trainedModel.data.approxParameters = copy(
self.approxParameters)
"""
pass
def checkComputedApprox(self) -> bool:
"""
Check if setup of new approximant is not needed.
Returns:
True if new setup is not needed. False otherwise.
"""
return self._mode == RROMPy_FRAGILE or (self.trainedModel is not None
and self.trainedModel.data.approxParameters == self.approxParameters)
def evalApproxReduced(self, mu:complex):
"""
Evaluate reduced representation of approximant at arbitrary parameter.
Args:
mu: Target parameter.
"""
self.setupApprox()
self.uAppReduced = self.trainedModel.getApproxReduced(mu)
def evalApprox(self, mu:complex):
"""
Evaluate approximant at arbitrary parameter.
Args:
mu: Target parameter.
"""
self.setupApprox()
self.uApp = self.trainedModel.getApprox(mu)
def getHF(self, mu:complex, homogeneized : bool = False) -> Np1D:
"""
Get HF solution at arbitrary parameter.
Args:
mu: Target parameter.
homogeneized(optional): Whether to remove Dirichlet BC. Defaults to
False.
Returns:
HFsolution.
"""
self.solveHF(mu)
if self.homogeneized and not homogeneized:
return self.uHF + self.HFEngine.liftDirichletData(mu)
if not self.homogeneized and homogeneized:
return self.uHF - self.HFEngine.liftDirichletData(mu)
return self.uHF
def getRHS(self, mu:complex, homogeneized : bool = False) -> Np1D:
"""
Get linear system RHS at arbitrary parameter.
Args:
mu: Target parameter.
homogeneized(optional): Whether to remove Dirichlet BC. Defaults to
False.
Returns:
Linear system RHS.
"""
return self.HFEngine.residual(None, mu, homogeneized = homogeneized)
def getApproxReduced(self, mu:complex) -> Np1D:
"""
Get approximant at arbitrary parameter.
Args:
mu: Target parameter.
Returns:
Reduced approximant.
"""
self.evalApproxReduced(mu)
return self.uAppReduced
def getApprox(self, mu:complex, homogeneized : bool = False) -> Np1D:
"""
Get approximant at arbitrary parameter.
Args:
mu: Target parameter.
homogeneized(optional): Whether to remove Dirichlet BC. Defaults to
False.
Returns:
Approximant.
"""
self.evalApprox(mu)
if self.homogeneized and not homogeneized:
return self.uApp + self.HFEngine.liftDirichletData(mu)
if not self.homogeneized and homogeneized:
return self.uApp - self.HFEngine.liftDirichletData(mu)
return self.uApp
def getRes(self, mu:complex, homogeneized : bool = False) -> Np1D:
"""
Get residual at arbitrary parameter.
Args:
mu: Target parameter.
homogeneized(optional): Whether to remove Dirichlet BC. Defaults to
False.
Returns:
Approximant residual.
"""
return self.HFEngine.residual(self.getApprox(mu, homogeneized), mu,
homogeneized = homogeneized)
def getErr(self, mu:complex, homogeneized : bool = False) -> Np1D:
"""
Get error at arbitrary parameter.
Args:
mu: Target parameter.
homogeneized(optional): Whether to remove Dirichlet BC. Defaults to
False.
Returns:
Approximant error.
"""
return self.getApprox(mu, homogeneized) - self.getHF(mu, homogeneized)
def getPoles(self) -> Np1D:
"""
Obtain approximant poles.
Returns:
Numpy complex vector of poles.
"""
self.setupApprox()
if self.verbosity >= 20:
verbosityDepth("INIT", "Computing poles of model.",
timestamp = self.timestamp)
poles = self.trainedModel.getPoles()
if self.verbosity >= 20:
verbosityDepth("DEL", "Done computing poles.",
timestamp = self.timestamp)
return poles
def storeTrainedModel(self, filenameBase : str = "trained_model",
forceNewFile : bool = True):
"""Store trained reduced model to file."""
self.setupApprox()
if self.verbosity >= 20:
verbosityDepth("INIT", "Storing trained model to file.",
timestamp = self.timestamp)
if forceNewFile:
filename = getNewFilename(filenameBase, "pkl")
else:
filename = "{}.pkl".format(filenameBase)
- with open(filename, "wb") as fileOut:
- pickle.dump(self.trainedModel.data.__dict__, fileOut)
+ pickleDump(self.trainedModel.data.__dict__, filename)
if self.verbosity >= 20:
verbosityDepth("DEL", "Done storing trained model.",
timestamp = self.timestamp)
return filename
def loadTrainedModel(self, filename:str):
"""Load trained reduced model from file."""
if self.verbosity >= 20:
verbosityDepth("INIT", "Loading pre-trained model from file.",
timestamp = self.timestamp)
- with open(filename, "rb") as fileIn:
- datadict = pickle.load(fileIn)
+ datadict = pickleLoad(filename)
name = datadict.pop("name")
if name == "TrainedModelPade":
from rrompy.reduction_methods.trained_model import \
TrainedModelPade as tModel
elif name == "TrainedModelRB":
from rrompy.reduction_methods.trained_model import \
TrainedModelRB as tModel
else:
raise RROMPyException(("Trained model name not recognized. "
"Loading failed."))
self.mu0 = datadict.pop("mu0")
from rrompy.reduction_methods.trained_model import TrainedModelData
trainedModel = tModel()
trainedModel.verbosity = self.verbosity
trainedModel.timestamp = self.timestamp
data = TrainedModelData(name, self.mu0, datadict.pop("projMat"),
datadict.pop("rescalingExp"))
if "mus" in datadict:
data.mus = datadict.pop("mus")
approxParameters = datadict.pop("approxParameters")
data.approxParameters = copy(approxParameters)
if "sampler" in approxParameters:
self._approxParameters["sampler"] = approxParameters.pop("sampler")
self.approxParameters = copy(approxParameters)
if "mus" in data.__dict__:
- self.mus = np.copy(data.mus)
+ self.mus = copy(data.mus)
if name == "TrainedModelPade":
self.scaleFactor = datadict.pop("scaleFactor")
data.scaleFactor = self.scaleFactor
for key in datadict:
setattr(data, key, datadict[key])
trainedModel.data = data
self.trainedModel = trainedModel
self._mode = RROMPy_FRAGILE
if self.verbosity >= 20:
verbosityDepth("DEL", "Done loading pre-trained model.",
timestamp = self.timestamp)
diff --git a/rrompy/reduction_methods/base/rb_utils.py b/rrompy/reduction_methods/base/rb_utils.py
index e7b8691..4ef9124 100644
--- a/rrompy/reduction_methods/base/rb_utils.py
+++ b/rrompy/reduction_methods/base/rb_utils.py
@@ -1,56 +1,58 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
import numpy as np
+#from copy import deepcopy as copy
from rrompy.utilities.base.types import Np1D, Np2D, Tuple, List
__all__ = ['projectAffineDecomposition']
def projectAffineDecomposition(As:List[Np2D], bs:List[Np1D], pMat:Np2D,
ARBsOld : List[Np2D] = None,
bRBsOld : List[Np1D] = None,
pMatOld : Np2D = None)\
-> Tuple[List[Np2D], List[Np1D]]:
"""Project affine decomposition of linear system onto basis."""
assert((ARBsOld is None) == (pMatOld is None)
and (bRBsOld is None) == (pMatOld is None))
pMatH = pMat.T.conj()
ARBs = [None] * len(As)
bRBs = [None] * len(bs)
if pMatOld is None:
for j in range(len(As)):
ARBs[j] = pMatH.dot(As[j].dot(pMat))
for j in range(len(bs)):
bRBs[j] = pMatH.dot(bs[j])
else:
assert(len(ARBsOld) == len(As) and len(bRBsOld) == len(bs))
pMatOldH = pMatOld.T.conj()
Sold = pMatOld.shape[1]
Snew = pMat.shape[1]
for j in range(len(As)):
ARBs[j] = np.empty((Sold + Snew, Sold + Snew), dtype = np.complex)
ARBs[j][: Sold, : Sold] = ARBsOld[j]
ARBs[j][: Sold, Sold :] = pMatOldH.dot(As[j].dot(pMat))
ARBs[j][Sold :, : Sold] = pMatH.dot(As[j].dot(pMatOld))
ARBs[j][Sold :, Sold :] = pMatH.dot(As[j].dot(pMat))
for j in range(len(bs)):
bRBs[j] = np.empty((Sold + Snew), dtype = np.complex)
- bRBs[j][: Sold] = np.copy(bRBsOld[j])
+ bRBs[j][: Sold] = bRBsOld[j]
+# bRBs[j][: Sold] = copy(bRBsOld[j])
bRBs[j][Sold :] = pMatH.dot(bs[j])
return ARBs, bRBs
diff --git a/rrompy/reduction_methods/centered/rational_pade.py b/rrompy/reduction_methods/centered/rational_pade.py
index 58b51a9..5846c74 100644
--- a/rrompy/reduction_methods/centered/rational_pade.py
+++ b/rrompy/reduction_methods/centered/rational_pade.py
@@ -1,456 +1,456 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
-from copy import copy
+from copy import deepcopy as copy
import numpy as np
from rrompy.reduction_methods.base import checkRobustTolerance
from rrompy.reduction_methods.trained_model import (TrainedModelData,
TrainedModelPade as tModel)
from .generic_centered_approximant import GenericCenteredApproximant
from rrompy.sampling.base.pod_engine import PODEngine
from rrompy.utilities.base.types import Np1D, Np2D, Tuple, DictAny, HFEng
from rrompy.utilities.base import verbosityDepth, purgeDict
from rrompy.utilities.exception_manager import (RROMPyException, modeAssert,
RROMPyWarning)
__all__ = ['RationalPade']
class RationalPade(GenericCenteredApproximant):
"""
ROM single-point fast Pade' approximant computation for parametric
problems.
Args:
HFEngine: HF problem solver.
mu0(optional): Default parameter. Defaults to 0.
approxParameters(optional): Dictionary containing values for main
parameters of approximant. Recognized keys are:
- 'POD': whether to compute POD of snapshots; defaults to True;
- 'rho': weight for computation of original Pade' approximant;
defaults to np.inf, i.e. fast approximant;
- 'M': degree of Pade' approximant numerator; defaults to 0;
- 'N': degree of Pade' approximant denominator; defaults to 0;
- 'E': total number of derivatives current approximant relies upon;
defaults to 1;
- 'robustTol': tolerance for robust Pade' denominator management;
defaults to 0;
- 'sampleType': label of sampling type; available values are:
- 'ARNOLDI': orthogonalization of solution derivatives through
Arnoldi algorithm;
- 'KRYLOV': standard computation of solution derivatives.
Defaults to 'KRYLOV'.
Defaults to empty dict.
homogeneized(optional): Whether to homogeneize Dirichlet BCs. Defaults
to False.
verbosity(optional): Verbosity level. Defaults to 10.
Attributes:
HFEngine: HF problem solver.
mu0: Default parameter.
homogeneized: Whether to homogeneize Dirichlet BCs.
approxParameters: Dictionary containing values for main parameters of
approximant. Recognized keys are in parameterList.
parameterList: Recognized keys of approximant parameters:
- 'POD': whether to compute POD of snapshots;
- 'rho': weight for computation of original Pade' approximant;
- 'M': degree of Pade' approximant numerator;
- 'N': degree of Pade' approximant denominator;
- 'E': total number of derivatives current approximant relies upon;
- 'robustTol': tolerance for robust Pade' denominator management;
- 'sampleType': label of sampling type.
POD: Whether to compute QR factorization of derivatives.
rho: Weight of approximant.
M: Numerator degree of approximant.
N: Denominator degree of approximant.
E: Number of solution derivatives over which current approximant is
based upon.
robustTol: Tolerance for robust Pade' denominator management.
sampleType: Label of sampling type.
initialHFData: HF problem initial data.
uHF: High fidelity solution with wavenumber lastSolvedHF as numpy
complex vector.
lastSolvedHF: Wavenumber corresponding to last computed high fidelity
solution.
G: Square Numpy 2D vector of size (N+1) corresponding to Pade'
denominator matrix (see paper).
uApp: Last evaluated approximant as numpy complex vector.
"""
def __init__(self, HFEngine:HFEng, mu0 : complex = 0,
approxParameters : DictAny = {}, homogeneized : bool = False,
verbosity : int = 10, timestamp : bool = True):
self._preInit()
self._addParametersToList(["M", "N", "robustTol", "rho"])
super().__init__(HFEngine = HFEngine, mu0 = mu0,
approxParameters = approxParameters,
homogeneized = homogeneized,
verbosity = verbosity, timestamp = timestamp)
self._postInit()
@property
def approxParameters(self):
"""Value of approximant parameters."""
return self._approxParameters
@approxParameters.setter
def approxParameters(self, approxParams):
approxParameters = purgeDict(approxParams, self.parameterList,
dictname = self.name() + ".approxParameters",
baselevel = 1)
approxParametersCopy = purgeDict(approxParameters,
["M", "N", "robustTol", "rho"],
True, True, baselevel = 1)
keyList = list(approxParameters.keys())
if "rho" in keyList:
self._rho = approxParameters["rho"]
elif not hasattr(self, "_rho") or self.rho is None:
self._rho = np.inf
GenericCenteredApproximant.approxParameters.fset(self,
approxParametersCopy)
self.rho = self._rho
if "robustTol" in keyList:
self.robustTol = approxParameters["robustTol"]
elif not hasattr(self, "_robustTol") or self._robustTol is None:
self.robustTol = 0
self._ignoreParWarnings = True
if "M" in keyList:
self.M = approxParameters["M"]
elif hasattr(self, "_M") and self._M is not None:
self.M = self.M
else:
self.M = 0
del self._ignoreParWarnings
if "N" in keyList:
self.N = approxParameters["N"]
elif hasattr(self, "_N") and self._N is not None:
self.N = self.N
else:
self.N = 0
@property
def rho(self):
"""Value of rho."""
return self._rho
@rho.setter
def rho(self, rho):
self._rho = np.abs(rho)
self._approxParameters["rho"] = self.rho
@property
def M(self):
"""Value of M. Its assignment may change E."""
return self._M
@M.setter
def M(self, M):
if M < 0: raise RROMPyException("M must be non-negative.")
self._M = M
self._approxParameters["M"] = self.M
if not hasattr(self, "_ignoreParWarnings"):
self.checkMNE()
@property
def N(self):
"""Value of N. Its assignment may change E."""
return self._N
@N.setter
def N(self, N):
if N < 0: raise RROMPyException("N must be non-negative.")
self._N = N
self._approxParameters["N"] = self.N
if not hasattr(self, "_ignoreParWarnings"):
self.checkMNE()
def checkMNE(self):
"""Check consistency of M, N, and E."""
if not hasattr(self, "_E") or self.E is None: return
M = self.M if (hasattr(self, "_M") and self.M is not None) else 0
N = self.N if (hasattr(self, "_N") and self.N is not None) else 0
msg = "max(M, N)" if self.rho == np.inf else "M + N"
bound = eval(msg)
if self.E < bound:
RROMPyWarning(("Prescribed E is too small. Updating E to "
"{}.").format(msg))
self.E = bound
del M, N
@property
def robustTol(self):
"""Value of tolerance for robust Pade' denominator management."""
return self._robustTol
@robustTol.setter
def robustTol(self, robustTol):
if robustTol < 0.:
RROMPyWarning(("Overriding prescribed negative robustness "
"tolerance to 0."))
robustTol = 0.
self._robustTol = robustTol
self._approxParameters["robustTol"] = self.robustTol
@property
def E(self):
"""Value of E."""
return self._E
@E.setter
def E(self, E):
GenericCenteredApproximant.E.fset(self, E)
self.checkMNE()
def _setupDenominator(self):
"""Compute Pade' denominator."""
if self.verbosity >= 7:
verbosityDepth("INIT", "Starting computation of denominator.",
timestamp = self.timestamp)
while self.N > 0:
if self.POD:
ev, eV = self.findeveVGQR()
else:
ev, eV = self.findeveVGExplicit()
newParameters = checkRobustTolerance(ev, self.E, self.robustTol)
if not newParameters:
break
self.approxParameters = newParameters
if self.N <= 0:
eV = np.ones((1, 1))
if self.verbosity >= 7:
verbosityDepth("DEL", "Done computing denominator.",
timestamp = self.timestamp)
return eV[::-1, 0]
def _setupNumerator(self):
"""Compute Pade' numerator."""
if self.verbosity >= 7:
verbosityDepth("INIT", "Starting computation of numerator.",
timestamp = self.timestamp)
P = np.zeros((self.E + 1, self.M + 1), dtype = np.complex)
for i in range(self.E + 1):
l = min(self.M + 1, i + self.N + 1)
if i < l:
P[i, i : l] = self.trainedModel.data.Q[: l - i]
if self.verbosity >= 7:
verbosityDepth("DEL", "Done computing numerator.",
timestamp = self.timestamp)
return self.rescaleParameter(P.T).T
def setupApprox(self):
"""
Compute Pade' approximant. SVD-based robust eigenvalue management.
"""
if self.checkComputedApprox():
return
if self.verbosity >= 5:
verbosityDepth("INIT", "Setting up {}.". format(self.name()),
timestamp = self.timestamp)
self.computeDerivatives()
if self.trainedModel is None:
self.trainedModel = tModel()
self.trainedModel.verbosity = self.verbosity
self.trainedModel.timestamp = self.timestamp
data = TrainedModelData(self.trainedModel.name(), self.mu0,
None, self.HFEngine.rescalingExp)
data.polytype = "MONOMIAL"
self.trainedModel.data = data
if self.N > 0:
Q = self._setupDenominator()
else:
Q = np.ones(1, dtype = np.complex)
- self.trainedModel.data.Q = np.copy(Q)
+ self.trainedModel.data.Q = copy(Q)
self.trainedModel.data.scaleFactor = self.scaleFactor
- self.trainedModel.data.projMat = (
- self.samplingEngine.samples[:, : self.E + 1])
+ self.trainedModel.data.projMat = copy(self.samplingEngine.samples[:,
+ : self.E + 1])
P = self._setupNumerator()
if self.sampleType == "ARNOLDI":
P = self.samplingEngine.RArnoldi.dot(P)
- self.trainedModel.data.P = np.copy(P)
+ self.trainedModel.data.P = copy(P)
self.trainedModel.data.approxParameters = copy(self.approxParameters)
if self.verbosity >= 5:
verbosityDepth("DEL", "Done setting up approximant.",
timestamp = self.timestamp)
def rescaleParameter(self, R:Np2D, A : Np2D = None,
exponent : float = 1.) -> Np2D:
"""
Prepare parameter rescaling.
Args:
R: Matrix whose columns need rescaling.
A(optional): Matrix whose diagonal defines scaling factor. If None,
previous value of scaleFactor is used. Defaults to None.
exponent(optional): Exponent of scaling factor in matrix diagonal.
Defaults to 1.
Returns:
Rescaled matrix.
"""
modeAssert(self._mode, message = "Cannot compute rescaling factor.")
if A is not None:
aDiag = np.diag(A)
scaleCoeffs = np.polyfit(np.arange(A.shape[1]),
np.log(aDiag), 1)
self.scaleFactor = np.exp(- scaleCoeffs[0] / exponent)
return np.multiply(R, np.power(self.scaleFactor,np.arange(R.shape[1])))
def buildG(self):
"""Assemble Pade' denominator matrix."""
modeAssert(self._mode, message = "Cannot compute G matrix.")
self.computeDerivatives()
if self.verbosity >= 10:
verbosityDepth("INIT", "Building gramian matrix.",
timestamp = self.timestamp)
if self.rho == np.inf:
Nmin = self.E - self.N
else:
Nmin = self.M - self.N + 1
if self.sampleType == "KRYLOV":
DerE = self.samplingEngine.samples[:, Nmin : self.E + 1]
G = self.HFEngine.innerProduct(DerE, DerE)
DerE = self.rescaleParameter(DerE, G, 2.)
G = self.HFEngine.innerProduct(DerE, DerE)
else:
RArnE = self.samplingEngine.RArnoldi[: self.E + 1,
Nmin : self.E + 1]
RArnE = self.rescaleParameter(RArnE, RArnE[Nmin :, :])
G = RArnE.T.conj().dot(RArnE)
if self.rho == np.inf:
self.G = G
else:
Gbig = G
self.G = np.zeros((self.N + 1, self.N + 1), dtype = np.complex)
for k in range(self.E - self.M):
self.G += self.rho ** (2 * k) * Gbig[k : k + self.N + 1,
k : k + self.N + 1]
if self.verbosity >= 10:
verbosityDepth("DEL", "Done building gramian.",
timestamp = self.timestamp)
def findeveVGExplicit(self) -> Tuple[Np1D, Np2D]:
"""
Compute explicitly eigenvalues and eigenvectors of Pade' denominator
matrix.
"""
modeAssert(self._mode, message = "Cannot solve eigenvalue problem.")
self.buildG()
if self.verbosity >= 7:
verbosityDepth("INIT",
"Solving eigenvalue problem for gramian matrix.",
timestamp = self.timestamp)
ev, eV = np.linalg.eigh(self.G)
if self.verbosity >= 5:
try: condev = ev[-1] / ev[0]
except: condev = np.inf
verbosityDepth("MAIN", ("Solved eigenvalue problem of size {} "
"with condition number {:.4e}.").format(
self.N + 1,
condev),
timestamp = self.timestamp)
if self.verbosity >= 7:
verbosityDepth("DEL", "Done solving eigenvalue problem.",
timestamp = self.timestamp)
return ev, eV
def findeveVGQR(self) -> Tuple[Np1D, Np2D]:
"""
Compute eigenvalues and eigenvectors of Pade' denominator matrix
through SVD of R factor. See ``Householder triangularization of a
quasimatrix'', L.Trefethen, 2008 for QR algorithm.
Returns:
Eigenvalues in ascending order and corresponding eigenvector
matrix.
"""
modeAssert(self._mode, message = "Cannot solve eigenvalue problem.")
self.computeDerivatives()
if self.rho == np.inf:
Nmin = self.E - self.N
else:
Nmin = self.M - self.N + 1
if self.sampleType == "KRYLOV":
A = copy(self.samplingEngine.samples[:, Nmin : self.E + 1])
self.PODEngine = PODEngine(self.HFEngine)
if self.verbosity >= 10:
verbosityDepth("INIT", "Orthogonalizing samples.",
timestamp = self.timestamp)
R = self.PODEngine.QRHouseholder(A, only_R = True)
if self.verbosity >= 10:
verbosityDepth("DEL", "Done orthogonalizing samples.",
timestamp = self.timestamp)
else:
R = self.samplingEngine.RArnoldi[: self.E + 1, Nmin : self.E + 1]
R = self.rescaleParameter(R, R[R.shape[0] - R.shape[1] :, :])
if self.rho == np.inf:
if self.verbosity >= 7:
verbosityDepth("INIT", ("Solving svd for square root of "
"gramian matrix."),
timestamp = self.timestamp)
sizeI = R.shape[0]
_, s, V = np.linalg.svd(R, full_matrices = False)
else:
if self.verbosity >= 10:
verbosityDepth("INIT", ("Building matrix stack for square "
"root of gramian."),
timestamp = self.timestamp)
Rtower = np.zeros((R.shape[0] * (self.E - self.M), self.N + 1),
dtype = np.complex)
for k in range(self.E - self.M):
RTleft = max(0, self.N - self.M - k)
Rleft = max(0, self.M - self.N + k)
Rtower[k * R.shape[0] : (k + 1) * R.shape[0], RTleft :] = (
self.rho ** k * R[:, Rleft : self.M + 1 + k])
if self.verbosity >= 10:
verbosityDepth("DEL", "Done building matrix stack.",
timestamp = self.timestamp)
if self.verbosity >= 7:
verbosityDepth("INIT", ("Solving svd for square root of "
"gramian matrix."),
timestamp = self.timestamp)
sizeI = Rtower.shape[0]
_, s, V = np.linalg.svd(Rtower, full_matrices = False)
eV = V[::-1, :].T.conj()
if self.verbosity >= 5:
try: condev = s[0] / s[-1]
except: condev = np.inf
verbosityDepth("MAIN", ("Solved svd problem of size {} x {} with "
"condition number {:.4e}.").format(sizeI,
self.N + 1,
condev),
timestamp = self.timestamp)
if self.verbosity >= 7:
verbosityDepth("DEL", "Done solving eigenvalue problem.",
timestamp = self.timestamp)
return s[::-1], eV
def radiusPade(self, mu:Np1D, mu0 : float = None) -> float:
"""
Compute translated radius to be plugged into Pade' approximant.
Args:
mu: Parameter(s) 1.
mu0: Parameter(s) 2. If None, set to self.mu0.
Returns:
Translated radius to be plugged into Pade' approximant.
"""
return self.trainedModel.radiusPade(mu, mu0)
def getResidues(self) -> Np1D:
"""
Obtain approximant residues.
Returns:
Matrix with residues as columns.
"""
return self.trainedModel.getResidues()
diff --git a/rrompy/reduction_methods/centered/rb_centered.py b/rrompy/reduction_methods/centered/rb_centered.py
index 7d8a1c0..1ba42ee 100644
--- a/rrompy/reduction_methods/centered/rb_centered.py
+++ b/rrompy/reduction_methods/centered/rb_centered.py
@@ -1,230 +1,230 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
-from copy import copy
+from copy import deepcopy as copy
import numpy as np
from .generic_centered_approximant import GenericCenteredApproximant
from rrompy.reduction_methods.trained_model import TrainedModelRB as tModel
from rrompy.reduction_methods.trained_model import TrainedModelData
from rrompy.reduction_methods.base.rb_utils import projectAffineDecomposition
from rrompy.sampling.base.pod_engine import PODEngine
from rrompy.utilities.base.types import Np1D, Np2D, Tuple, List, DictAny, HFEng
from rrompy.utilities.base import purgeDict, verbosityDepth
from rrompy.utilities.exception_manager import RROMPyWarning
__all__ = ['RBCentered']
class RBCentered(GenericCenteredApproximant):
"""
ROM single-point fast RB approximant computation for parametric problems
with polynomial dependence up to degree 2.
Args:
HFEngine: HF problem solver.
mu0(optional): Default parameter. Defaults to 0.
approxParameters(optional): Dictionary containing values for main
parameters of approximant. Recognized keys are:
- 'POD': whether to compute POD of snapshots; defaults to True;
- 'R': rank for Galerkin projection; defaults to E + 1;
- 'E': total number of derivatives current approximant relies upon;
defaults to 1;
- 'sampleType': label of sampling type; available values are:
- 'ARNOLDI': orthogonalization of solution derivatives through
Arnoldi algorithm;
- 'KRYLOV': standard computation of solution derivatives.
Defaults to 'KRYLOV'.
Defaults to empty dict.
homogeneized(optional): Whether to homogeneize Dirichlet BCs. Defaults
to False.
verbosity(optional): Verbosity level. Defaults to 10.
Attributes:
HFEngine: HF problem solver.
mu0: Default parameter.
homogeneized: Whether to homogeneize Dirichlet BCs.
approxParameters: Dictionary containing values for main parameters of
approximant. Recognized keys are in parameterList.
parameterList: Recognized keys of approximant parameters:
- 'POD': whether to compute POD of snapshots;
- 'R': rank for Galerkin projection;
- 'E': total number of derivatives current approximant relies upon;
- 'sampleType': label of sampling type.
POD: Whether to compute QR factorization of derivatives.
R: Rank for Galerkin projection.
E: Number of solution derivatives over which current approximant is
based upon.
sampleType: Label of sampling type, i.e. 'KRYLOV'.
uHF: High fidelity solution with wavenumber lastSolvedHF as numpy
complex vector.
lastSolvedHF: Wavenumber corresponding to last computed high fidelity
solution.
uApp: Last evaluated approximant as numpy complex vector.
ARBs: List of sparse matrices (in CSC format) representing RB
coefficients of linear system matrix wrt mu.
bRBs: List of numpy vectors representing RB coefficients of linear
system RHS wrt mu.
"""
def __init__(self, HFEngine:HFEng, mu0 : complex = 0,
approxParameters : DictAny = {}, homogeneized : bool = False,
verbosity : int = 10, timestamp : bool = True):
self._preInit()
self._addParametersToList(["R"])
super().__init__(HFEngine = HFEngine, mu0 = mu0,
approxParameters = approxParameters,
homogeneized = homogeneized,
verbosity = verbosity, timestamp = timestamp)
if self.verbosity >= 10:
verbosityDepth("INIT", "Computing affine blocks of system.",
timestamp = self.timestamp)
if self.verbosity >= 10:
verbosityDepth("DEL", "Done computing affine blocks.",
timestamp = self.timestamp)
self._postInit()
@property
def approxParameters(self):
"""
Value of approximant parameters. Its assignment may change M, N and S.
"""
return self._approxParameters
@approxParameters.setter
def approxParameters(self, approxParams):
approxParameters = purgeDict(approxParams, self.parameterList,
dictname = self.name() + ".approxParameters",
baselevel = 1)
approxParametersCopy = purgeDict(approxParameters, ["R"],
True, True, baselevel = 1)
GenericCenteredApproximant.approxParameters.fset(self,
approxParametersCopy)
keyList = list(approxParameters.keys())
if "R" in keyList:
self.R = approxParameters["R"]
else:
self.R = self.E + 1
@property
def POD(self):
"""Value of POD."""
return self._POD
@POD.setter
def POD(self, POD):
GenericCenteredApproximant.POD.fset(self, POD)
if (hasattr(self, "_sampleType") and self.sampleType == "ARNOLDI"
and not self.POD):
RROMPyWarning(("Arnoldi sampling implicitly forces POD-type "
"derivative management."))
@property
def sampleType(self):
"""Value of sampleType."""
return self._sampleType
@sampleType.setter
def sampleType(self, sampleType):
GenericCenteredApproximant.sampleType.fset(self, sampleType)
if (hasattr(self, "_POD") and not self.POD
and self.sampleType == "ARNOLDI"):
RROMPyWarning(("Arnoldi sampling implicitly forces POD-type "
"derivative management."))
@property
def R(self):
"""Value of R. Its assignment may change S."""
return self._R
@R.setter
def R(self, R):
if R < 0: raise RROMPyException("R must be non-negative.")
self._R = R
self._approxParameters["R"] = self.R
if hasattr(self, "_E") and self.E + 1 < self.R:
RROMPyWarning("Prescribed E is too small. Updating E to R - 1.")
self.E = self.R - 1
def setupApprox(self):
"""Setup RB system."""
if self.checkComputedApprox():
return
if self.verbosity >= 5:
verbosityDepth("INIT", "Setting up {}.". format(self.name()),
timestamp = self.timestamp)
self.computeDerivatives()
if self.verbosity >= 7:
verbosityDepth("INIT", "Computing projection matrix.",
timestamp = self.timestamp)
if self.POD and not self.sampleType == "ARNOLDI":
self.PODEngine = PODEngine(self.HFEngine)
pMatQ, pMatR = self.PODEngine.QRHouseholder(
self.samplingEngine.samples)
if self.POD:
if self.sampleType == "ARNOLDI":
pMatR = self.samplingEngine.RArnoldi
pMatQ = self.samplingEngine.samples
U, _, _ = np.linalg.svd(pMatR[: self.E + 1, : self.E + 1])
pMat = pMatQ[:, : self.E + 1].dot(U[:, : self.R])
else:
pMat = self.samplingEngine.samples[:, : self.R]
if self.trainedModel is None:
self.trainedModel = tModel()
self.trainedModel.verbosity = self.verbosity
self.trainedModel.timestamp = self.timestamp
data = TrainedModelData(self.trainedModel.name(), self.mu0,
- np.copy(pMat), self.HFEngine.rescalingExp)
+ pMat, self.HFEngine.rescalingExp)
data.thetaAs = self.HFEngine.affineWeightsA(self.mu0)
data.thetabs = self.HFEngine.affineWeightsb(self.mu0,
self.homogeneized)
data.ARBs, data.bRBs = self.assembleReducedSystem(pMat)
self.trainedModel.data = data
else:
pMatOld = self.trainedModel.data.projMat
Sold = pMatOld.shape[1]
ARBs, bRBs = self.assembleReducedSystem(pMat[:, Sold :], pMatOld)
self.trainedModel.data.ARBs = ARBs
self.trainedModel.data.bRBs = bRBs
- self.trainedModel.data.projMat = np.copy(pMat)
+ self.trainedModel.data.projMat = copy(pMat)
if self.verbosity >= 7:
verbosityDepth("DEL", "Done computing projection matrix.",
timestamp = self.timestamp)
self.trainedModel.data.approxParameters = copy(self.approxParameters)
if self.verbosity >= 5:
verbosityDepth("DEL", "Done setting up approximant.",
timestamp = self.timestamp)
def assembleReducedSystem(self, pMat : Np2D = None, pMatOld : Np2D = None)\
-> Tuple[List[Np2D], List[Np1D]]:
"""Build affine blocks of RB linear system through projections."""
if pMat is None:
self.setupApprox()
ARBs = self.trainedModel.data.ARBs
bRBs = self.trainedModel.data.bRBs
else:
if self.verbosity >= 10:
verbosityDepth("INIT", "Projecting affine terms of HF model.",
timestamp = self.timestamp)
As = self.HFEngine.affineLinearSystemA(self.mu0)
bs = self.HFEngine.affineLinearSystemb(self.mu0, self.homogeneized)
ARBsOld = None if pMatOld is None else self.trainedModel.data.ARBs
bRBsOld = None if pMatOld is None else self.trainedModel.data.bRBs
ARBs, bRBs = projectAffineDecomposition(As, bs, pMat, ARBsOld,
bRBsOld, pMatOld)
if self.verbosity >= 10:
verbosityDepth("DEL", "Done projecting affine terms.",
timestamp = self.timestamp)
return ARBs, bRBs
diff --git a/rrompy/reduction_methods/distributed/rational_interpolant.py b/rrompy/reduction_methods/distributed/rational_interpolant.py
index 554c8c0..085e8c7 100644
--- a/rrompy/reduction_methods/distributed/rational_interpolant.py
+++ b/rrompy/reduction_methods/distributed/rational_interpolant.py
@@ -1,525 +1,524 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
-from copy import copy
+from copy import deepcopy as copy
import numpy as np
from scipy.special import factorial as fact
from rrompy.reduction_methods.base import checkRobustTolerance
from .generic_distributed_approximant import GenericDistributedApproximant
from rrompy.utilities.poly_fitting import (polybases, polyvander, polyfitname,
customFit)
from rrompy.reduction_methods.trained_model import TrainedModelPade as tModel
from rrompy.reduction_methods.trained_model import TrainedModelData
from rrompy.utilities.base.types import Np1D, Np2D, HFEng, DictAny, Tuple
from rrompy.utilities.base import verbosityDepth, purgeDict
from rrompy.utilities.exception_manager import (RROMPyException, modeAssert,
RROMPyWarning)
__all__ = ['RationalInterpolant']
class RationalInterpolant(GenericDistributedApproximant):
"""
ROM rational interpolant computation for parametric problems.
Args:
HFEngine: HF problem solver.
mu0(optional): Default parameter. Defaults to 0.
approxParameters(optional): Dictionary containing values for main
parameters of approximant. Recognized keys are:
- 'POD': whether to compute POD of snapshots; defaults to True;
- 'muBounds': list of bounds for parameter values; defaults to
[0, 1];
- 'S': total number of samples current approximant relies upon;
defaults to 2;
- 'sampler': sample point generator; defaults to uniform sampler on
muBounds;
- 'polybasis': type of polynomial basis for interpolation; allowed
values include 'MONOMIAL', 'CHEBYSHEV' and 'LEGENDRE'; defaults
to 'MONOMIAL';
- 'E': coefficient of interpolant to be minimized; defaults to
min(S, M + 1);
- 'M': degree of Pade' interpolant numerator; defaults to 0;
- 'N': degree of Pade' interpolant denominator; defaults to 0;
- 'interpRcond': tolerance for interpolation; defaults to None;
- 'robustTol': tolerance for robust Pade' denominator management;
defaults to 0.
Defaults to empty dict.
homogeneized(optional): Whether to homogeneize Dirichlet BCs. Defaults
to False.
verbosity(optional): Verbosity level. Defaults to 10.
Attributes:
HFEngine: HF problem solver.
mu0: Default parameter.
mus: Array of snapshot parameters.
ws: Array of snapshot weigths.
homogeneized: Whether to homogeneize Dirichlet BCs.
approxParameters: Dictionary containing values for main parameters of
approximant. Recognized keys are in parameterList.
parameterList: Recognized keys of approximant parameters:
- 'POD': whether to compute POD of snapshots;
- 'muBounds': list of bounds for parameter values;
- 'S': total number of samples current approximant relies upon;
- 'sampler': sample point generator;
- 'polybasis': type of polynomial basis for interpolation;
- 'E': coefficient of interpolant to be minimized;
- 'M': degree of Pade' interpolant numerator;
- 'N': degree of Pade' interpolant denominator;
- 'interpRcond': tolerance for interpolation via numpy.polyfit;
- 'robustTol': tolerance for robust Pade' denominator management.
extraApproxParameters: List of approxParameters keys in addition to
mother class's.
POD: Whether to compute POD of snapshots.
muBounds: list of bounds for parameter values.
S: Number of solution snapshots over which current approximant is
based upon.
sampler: Sample point generator.
polybasis: type of polynomial basis for interpolation.
M: Numerator degree of approximant.
N: Denominator degree of approximant.
interpRcond: Tolerance for interpolation via numpy.polyfit.
robustTol: Tolerance for robust Pade' denominator management.
samplingEngine: Sampling engine.
uHF: High fidelity solution with wavenumber lastSolvedHF as numpy
complex vector.
lastSolvedHF: Wavenumber corresponding to last computed high fidelity
solution.
Q: Numpy 1D vector containing complex coefficients of approximant
denominator.
P: Numpy 2D vector whose columns are FE dofs of coefficients of
approximant numerator.
uApp: Last evaluated approximant as numpy complex vector.
"""
def __init__(self, HFEngine:HFEng, mu0 : complex = 0.,
approxParameters : DictAny = {}, homogeneized : bool = False,
verbosity : int = 10, timestamp : bool = True):
self._preInit()
self._addParametersToList(["polybasis", "E", "M", "N",
"interpRcond", "robustTol"])
super().__init__(HFEngine = HFEngine, mu0 = mu0,
approxParameters = approxParameters,
homogeneized = homogeneized,
verbosity = verbosity, timestamp = timestamp)
self._postInit()
@property
def approxParameters(self):
"""
Value of approximant parameters.
"""
return self._approxParameters
@approxParameters.setter
def approxParameters(self, approxParams):
approxParameters = purgeDict(approxParams, self.parameterList,
dictname = self.name() + ".approxParameters",
baselevel = 1)
approxParametersCopy = purgeDict(approxParameters, ["polybasis",
"E", "M", "N",
"interpRcond",
"robustTol"],
True, True, baselevel = 1)
if hasattr(self, "_M") and self.M is not None:
Mold = self.M
self._M = 0
if hasattr(self, "_N") and self.N is not None:
Nold = self.N
self._N = 0
if hasattr(self, "_E") and self.E is not None:
self._E = 0
GenericDistributedApproximant.approxParameters.fset(self,
approxParametersCopy)
keyList = list(approxParameters.keys())
if "polybasis" in keyList:
self.polybasis = approxParameters["polybasis"]
elif not hasattr(self, "_polybasis") or self._polybasis is None:
self.polybasis = "MONOMIAL"
if "interpRcond" in keyList:
self.interpRcond = approxParameters["interpRcond"]
elif not hasattr(self, "interpRcond") or self.interpRcond is None:
self.interpRcond = None
if "robustTol" in keyList:
self.robustTol = approxParameters["robustTol"]
elif not hasattr(self, "_robustTol") or self._robustTol is None:
self.robustTol = 0
if "M" in keyList:
self.M = approxParameters["M"]
elif hasattr(self, "_M") and self.M is not None:
self.M = Mold
else:
self.M = 0
if "N" in keyList:
self.N = approxParameters["N"]
elif hasattr(self, "_N") and self.N is not None:
self.N = Nold
else:
self.N = 0
if "E" in keyList:
self.E = approxParameters["E"]
else:
self.E = min(self.S - 1, self.M + 1)
@property
def polybasis(self):
"""Value of polybasis."""
return self._polybasis
@polybasis.setter
def polybasis(self, polybasis):
try:
polybasis = polybasis.upper().strip().replace(" ","")
if polybasis not in polybases:
raise RROMPyException("Prescribed polybasis not recognized.")
self._polybasis = polybasis
except:
RROMPyWarning(("Prescribed polybasis not recognized. Overriding "
"to 'MONOMIAL'."))
self._sampleType = "MONOMIAL"
self._approxParameters["polybasis"] = self.polybasis
@property
def interpRcond(self):
"""Value of interpRcond."""
return self._interpRcond
@interpRcond.setter
def interpRcond(self, interpRcond):
self._interpRcond = interpRcond
self._approxParameters["interpRcond"] = self.interpRcond
@property
def M(self):
"""Value of M. Its assignment may change S."""
return self._M
@M.setter
def M(self, M):
if M < 0: raise RROMPyException("M must be non-negative.")
self._M = M
self._approxParameters["M"] = self.M
if hasattr(self, "_S") and self.S < self.M + 1:
RROMPyWarning("Prescribed S is too small. Updating S to M + 1.")
self.S = self.M + 1
@property
def N(self):
"""Value of N. Its assignment may change S."""
return self._N
@N.setter
def N(self, N):
if N < 0: raise RROMPyException("N must be non-negative.")
self._N = N
self._approxParameters["N"] = self.N
if hasattr(self, "_S") and self.S < self.N + 1:
RROMPyWarning("Prescribed S is too small. Updating S to N + 1.")
self.S = self.N + 1
@property
def E(self):
"""Value of E. Its assignment may change S."""
return self._E
@E.setter
def E(self, E):
if E < 0: raise RROMPyException("E must be non-negative.")
self._E = E
self._approxParameters["E"] = self.E
if hasattr(self, "_S") and self.S < self.E + 1:
RROMPyWarning("Prescribed S is too small. Updating S to E + 1.")
self.S = self.E + 1
@property
def robustTol(self):
"""Value of tolerance for robust Pade' denominator management."""
return self._robustTol
@robustTol.setter
def robustTol(self, robustTol):
if robustTol < 0.:
RROMPyWarning(("Overriding prescribed negative robustness "
"tolerance to 0."))
robustTol = 0.
self._robustTol = robustTol
self._approxParameters["robustTol"] = self.robustTol
@property
def S(self):
"""Value of S."""
return self._S
@S.setter
def S(self, S):
if S <= 0: raise RROMPyException("S must be positive.")
if hasattr(self, "_S"): Sold = self.S
else: Sold = -1
vals, label = [0] * 3, {0:"M", 1:"N", 2:"E"}
if hasattr(self, "_M") and self._M is not None: vals[0] = self.M
if hasattr(self, "_N") and self._N is not None: vals[1] = self.N
if hasattr(self, "_E") and self._E is not None: vals[2] = self.E
idxmax = np.argmax(vals)
if vals[idxmax] + 1 > S:
RROMPyWarning(("Prescribed S is too small. Updating S to {} + "
"1.").format(label[idxmax]))
self.S = vals[idxmax] + 1
else:
self._S = S
self._approxParameters["S"] = self.S
if Sold != self.S:
self.resetSamples()
def _setupDenominator(self):
"""Compute Pade' denominator."""
if self.verbosity >= 7:
verbosityDepth("INIT", "Starting computation of denominator.",
timestamp = self.timestamp)
while self.N > 0:
TE = polyvander[self.polybasis](self.radiusPade(self.mus), self.E,
scl = 1. / self.scaleFactor)
TE = (TE.T * self.ws).T
RHS = np.zeros(self.E + 1)
RHS[-1] = 1.
fitOut = customFit(TE.T, RHS, full = True,
rcond = self.interpRcond)
if self.verbosity >= 5:
condfit = fitOut[1][2][0] / fitOut[1][2][-1]
verbosityDepth("MAIN", ("Fitting {} samples with degree {} "
"through {}... Conditioning of LS "
"system: {:.4e}.").format(
self.S, self.E,
polyfitname[self.polybasis],
condfit),
timestamp = self.timestamp)
if fitOut[1][1] < self.E + 1:
Enew = fitOut[1][1] - 1
Nnew = min(self.N, Enew)
Mnew = min(self.M, Enew)
if Nnew == self.N:
strN = ""
else:
strN = "N from {} to {} and ".format(self.N, Nnew)
if Mnew == self.M:
strM = ""
else:
strM = "M from {} to {} and ".format(self.M, Mnew)
RROMPyWarning(("Polyfit is poorly conditioned.\nReducing {}{}"
"E from {} to {}.").format(strN, strM,
self.E, Enew))
newParams = {"N" : Nnew, "M" : Mnew, "E" : Enew}
self.approxParameters = newParams
continue
mus_un, idx_un, cnt_un = np.unique(self.mus, return_inverse = True,
return_counts = True)
TE = polyvander[self.polybasis](self.radiusPade(self.mus), self.N,
scl = 1. / self.scaleFactor)
TE = (TE.T * self.ws).T
if len(mus_un) == len(self.mus):
Ghalf = (TE.T * fitOut[0]).T
else:
pseudoInv = np.zeros((len(self.mus), len(self.mus)),
dtype = np.complex)
for j in range(len(mus_un)):
pseudoInv_loc = np.zeros((cnt_un[j], cnt_un[j]),
dtype = np.complex)
mask = np.arange(len(self.mus))[idx_un == j]
for der in range(cnt_un[j]):
fitderj = fitOut[0][mask[der]]
pseudoInv_loc = (pseudoInv_loc + fitderj
* np.diag(np.ones(1 + der),
k = der - cnt_un[j] + 1))
I = np.ix_(mask, mask)
pseudoInv[I] = np.flipud(pseudoInv_loc)
Ghalf = pseudoInv.dot(TE)
if self.POD:
self.Ghalf = self.samplingEngine.RPOD.dot(Ghalf)
ev, eV = self.findeveVGQR()
else:
self.Ghalf = self.samplingEngine.samples.dot(Ghalf)
ev, eV = self.findeveVGExplicit()
newParams = checkRobustTolerance(ev, self.E, self.robustTol)
if not newParams:
break
self.approxParameters = newParams
if self.N <= 0:
self._N = 0
eV = np.ones((1, 1))
if self.verbosity >= 7:
verbosityDepth("DEL", "Done computing denominator.",
timestamp = self.timestamp)
return eV[:, 0]
def _setupNumerator(self):
"""Compute Pade' numerator."""
if self.verbosity >= 7:
verbosityDepth("INIT", "Starting computation of numerator.",
timestamp = self.timestamp)
Qevaldiag = np.diag(self.trainedModel.getQVal(self.mus))
verb = self.trainedModel.verbosity
self.trainedModel.verbosity = 0
mus_un, idx_un, cnt_un = np.unique(self.mus, return_inverse = True,
return_counts = True)
for j in range(len(mus_un)):
if cnt_un[j] > 1:
Q_loc = np.zeros((cnt_un[j], cnt_un[j]), dtype = np.complex)
for der in range(1, cnt_un[j]):
Qderj = (self.trainedModel.getQVal(mus_un[j], der,
scl = 1. / self.scaleFactor)
/ fact(der))
Q_loc = Q_loc + Qderj * np.diag(np.ones(cnt_un[j] - der),
k = - der)
I = np.ix_(idx_un == j, idx_un == j)
Qevaldiag[I] = Qevaldiag[I] + Q_loc
self.trainedModel.verbosity = verb
while self.M >= 0:
fitVander = polyvander[self.polybasis](self.radiusPade(self.mus),
self.M,
scl = 1. / self.scaleFactor)
fitOut = customFit(fitVander, Qevaldiag, w = self.ws, full = True,
rcond = self.interpRcond)
if self.verbosity >= 5:
condfit = fitOut[1][2][0] / fitOut[1][2][-1]
verbosityDepth("MAIN", ("Fitting {} samples with degree {} "
"through {}... Conditioning of LS "
"system: {:.4e}.").format(
self.S, self.M,
polyfitname[self.polybasis],
condfit),
timestamp = self.timestamp)
if fitOut[1][1] == self.M + 1:
P = fitOut[0].T
break
RROMPyWarning(("Polyfit is poorly conditioned. Reducing M from {} "
"to {}. Exact snapshot interpolation not "
"guaranteed.").format(self.M, fitOut[1][1] - 1))
self.M = fitOut[1][1] - 1
if self.M <= 0:
raise RROMPyException(("Instability in computation of numerator. "
"Aborting."))
if self.verbosity >= 7:
verbosityDepth("DEL", "Done computing numerator.",
timestamp = self.timestamp)
return np.atleast_2d(P)
def setupApprox(self):
"""
Compute Pade' interpolant.
SVD-based robust eigenvalue management.
"""
if self.checkComputedApprox():
return
modeAssert(self._mode, message = "Cannot setup approximant.")
if self.verbosity >= 5:
verbosityDepth("INIT", "Setting up {}.". format(self.name()),
timestamp = self.timestamp)
self.computeScaleFactor()
self.computeSnapshots()
if self.trainedModel is None:
self.trainedModel = tModel()
self.trainedModel.verbosity = self.verbosity
self.trainedModel.timestamp = self.timestamp
data = TrainedModelData(self.trainedModel.name(), self.mu0,
- np.copy(self.samplingEngine.samples),
+ self.samplingEngine.samples,
self.HFEngine.rescalingExp)
data.polytype = self.polybasis
data.scaleFactor = self.scaleFactor
- data.mus = np.copy(self.mus)
+ data.mus = copy(self.mus)
self.trainedModel.data = data
else:
- self.trainedModel.data.projMat = np.copy(
- self.samplingEngine.samples)
+ self.trainedModel.data.projMat = copy(self.samplingEngine.samples)
if self.N > 0:
Q = self._setupDenominator()
else:
Q = np.ones(1, dtype = np.complex)
- self.trainedModel.data.Q = np.copy(Q)
+ self.trainedModel.data.Q = copy(Q)
P = self._setupNumerator()
if self.POD:
P = self.samplingEngine.RPOD.dot(P)
- self.trainedModel.data.P = np.copy(P)
+ self.trainedModel.data.P = copy(P)
self.trainedModel.data.approxParameters = copy(self.approxParameters)
if self.verbosity >= 5:
verbosityDepth("DEL", "Done setting up approximant.",
timestamp = self.timestamp)
def findeveVGExplicit(self, verbOutput : int = 5) -> Tuple[Np1D, Np2D]:
"""
Compute explicitly eigenvalues and eigenvectors of Pade' denominator
matrix.
"""
modeAssert(self._mode, message = "Cannot solve eigenvalue problem.")
if self.verbosity >= 10:
verbosityDepth("INIT", "Building gramian matrix.",
timestamp = self.timestamp)
self.G = self.HFEngine.innerProduct(self.Ghalf, self.Ghalf)
if self.verbosity >= 10:
verbosityDepth("DEL", "Done building gramian.",
timestamp = self.timestamp)
if self.verbosity >= 7:
verbosityDepth("INIT", ("Solving eigenvalue problem for gramian "
"matrix."), timestamp = self.timestamp)
ev, eV = np.linalg.eigh(self.G)
if self.verbosity >= verbOutput:
try: condev = ev[-1] / ev[0]
except: condev = np.inf
verbosityDepth("MAIN", ("Solved eigenvalue problem of size {} "
"with condition number {:.4e}.").format(
self.N + 1, condev),
timestamp = self.timestamp)
if self.verbosity >= 7:
verbosityDepth("DEL", "Done solving eigenvalue problem.",
timestamp = self.timestamp)
return ev, eV
def findeveVGQR(self, verbOutput : int = 5) -> Tuple[Np1D, Np2D]:
"""
Compute eigenvalues and eigenvectors of Pade' denominator matrix
through SVD of R factor.
"""
if self.verbosity >= 7:
verbosityDepth("INIT", ("Solving svd for square root of gramian "
"matrix."), timestamp = self.timestamp)
_, s, eV = np.linalg.svd(self.Ghalf, full_matrices = False)
ev = s[::-1]
eV = eV[::-1, :].T.conj()
if self.verbosity >= verbOutput:
try: condev = s[0] / s[-1]
except: condev = np.inf
verbosityDepth("MAIN", ("Solved svd problem of size {} x {} with "
"condition number {:.4e}.").format(
self.S, self.N + 1, condev),
timestamp = self.timestamp)
if self.verbosity >= 7:
verbosityDepth("DEL", "Done solving eigenvalue problem.",
timestamp = self.timestamp)
return ev, eV
def radiusPade(self, mu:Np1D, mu0 : float = None) -> float:
"""
Compute translated radius to be plugged into Pade' approximant.
Args:
mu: Parameter(s) 1.
mu0: Parameter(s) 2. If None, set to self.mu0.
Returns:
Translated radius to be plugged into Pade' approximant.
"""
return self.trainedModel.radiusPade(mu, mu0)
def getResidues(self) -> Np1D:
"""
Obtain approximant residues.
Returns:
Matrix with residues as columns.
"""
return self.trainedModel.getResidues()
diff --git a/rrompy/reduction_methods/distributed/rb_distributed.py b/rrompy/reduction_methods/distributed/rb_distributed.py
index 7369320..a5fff99 100644
--- a/rrompy/reduction_methods/distributed/rb_distributed.py
+++ b/rrompy/reduction_methods/distributed/rb_distributed.py
@@ -1,216 +1,216 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
-from copy import copy
+from copy import deepcopy as copy
import numpy as np
from .generic_distributed_approximant import GenericDistributedApproximant
from rrompy.reduction_methods.trained_model import TrainedModelRB as tModel
from rrompy.reduction_methods.trained_model import TrainedModelData
from rrompy.reduction_methods.base.rb_utils import projectAffineDecomposition
from rrompy.utilities.base.types import Np1D, Np2D, List, Tuple, DictAny, HFEng
from rrompy.utilities.base import purgeDict, verbosityDepth
from rrompy.utilities.exception_manager import RROMPyWarning, RROMPyException
__all__ = ['RBDistributed']
class RBDistributed(GenericDistributedApproximant):
"""
ROM RB approximant computation for parametric problems.
Args:
HFEngine: HF problem solver.
mu0(optional): Default parameter. Defaults to 0.
approxParameters(optional): Dictionary containing values for main
parameters of approximant. Recognized keys are:
- 'muBounds': list of bounds for parameter values; defaults to
[0, 1];
- 'S': total number of samples current approximant relies upon;
defaults to 2;
- 'sampler': sample point generator; defaults to uniform sampler on
muBounds;
- 'R': rank for Galerkin projection; defaults to S.
Defaults to empty dict.
homogeneized(optional): Whether to homogeneize Dirichlet BCs. Defaults
to False.
verbosity(optional): Verbosity level. Defaults to 10.
Attributes:
HFEngine: HF problem solver.
mu0: Default parameter.
mus: Array of snapshot parameters.
ws: Array of snapshot weigths (unused).
homogeneized: Whether to homogeneize Dirichlet BCs.
approxRadius: Dummy radius of approximant (i.e. distance from mu0 to
farthest sample point).
approxParameters: Dictionary containing values for main parameters of
approximant. Recognized keys are in parameterList.
parameterList: Recognized keys of approximant parameters:
- 'POD': whether to compute POD of snapshots;
- 'muBounds': list of bounds for parameter values;
- 'S': total number of samples current approximant relies upon;
- 'sampler': sample point generator;
- 'R': rank for Galerkin projection.
extraApproxParameters: List of approxParameters keys in addition to
mother class's.
POD: Whether to compute POD of snapshots.
muBounds: list of bounds for parameter values.
S: Number of solution snapshots over which current approximant is
based upon.
sampler: Sample point generator.
R: Rank for Galerkin projection.
samplingEngine: Sampling engine.
uHF: High fidelity solution with wavenumber lastSolvedHF as numpy
complex vector.
lastSolvedHF: Wavenumber corresponding to last computed high fidelity
solution.
uApp: Last evaluated approximant as numpy complex vector.
As: List of sparse matrices (in CSC format) representing coefficients
of linear system matrix wrt theta(mu).
bs: List of numpy vectors representing coefficients of linear system
RHS wrt theta(mu).
thetaAs: List of callables representing coefficients of linear system
matrix wrt mu.
thetabs: List of callables representing coefficients of linear system
RHS wrt mu.
ARBs: List of sparse matrices (in CSC format) representing coefficients
of compressed linear system matrix wrt theta(mu).
bRBs: List of numpy vectors representing coefficients of compressed
linear system RHS wrt theta(mu).
"""
def __init__(self, HFEngine:HFEng, mu0 : complex = 0.,
approxParameters : DictAny = {}, homogeneized : bool = False,
verbosity : int = 10, timestamp : bool = True):
self._preInit()
self._addParametersToList(["R"])
super().__init__(HFEngine = HFEngine, mu0 = mu0,
approxParameters = approxParameters,
homogeneized = homogeneized,
verbosity = verbosity, timestamp = timestamp)
if self.verbosity >= 10:
verbosityDepth("INIT", "Computing affine blocks of system.",
timestamp = self.timestamp)
self.As = self.HFEngine.affineLinearSystemA(self.mu0)
self.bs = self.HFEngine.affineLinearSystemb(self.mu0,
self.homogeneized)
if self.verbosity >= 10:
verbosityDepth("DEL", "Done computing affine blocks.",
timestamp = self.timestamp)
self._postInit()
@property
def approxParameters(self):
"""
Value of approximant parameters. Its assignment may change M, N and S.
"""
return self._approxParameters
@approxParameters.setter
def approxParameters(self, approxParams):
approxParameters = purgeDict(approxParams, self.parameterList,
dictname = self.name() + ".approxParameters",
baselevel = 1)
approxParametersCopy = purgeDict(approxParameters, ["R"], True, True,
baselevel = 1)
GenericDistributedApproximant.approxParameters.fset(self,
approxParametersCopy)
keyList = list(approxParameters.keys())
if "R" in keyList:
self.R = approxParameters["R"]
elif hasattr(self, "_R") and self._R is not None:
self.R = self.R
else:
self.R = self.S
@property
def R(self):
"""Value of R. Its assignment may change S."""
return self._R
@R.setter
def R(self, R):
if R < 0: raise RROMPyException("R must be non-negative.")
self._R = R
self._approxParameters["R"] = self.R
if hasattr(self, "_S") and self.S < self.R:
RROMPyWarning("Prescribed S is too small. Updating S to R.")
self.S = self.R
def setupApprox(self):
"""Compute RB projection matrix."""
if self.checkComputedApprox():
return
if self.verbosity >= 5:
verbosityDepth("INIT", "Setting up {}.". format(self.name()),
timestamp = self.timestamp)
self.computeSnapshots()
if self.verbosity >= 7:
verbosityDepth("INIT", "Computing projection matrix.",
timestamp = self.timestamp)
if self.POD:
U, _, _ = np.linalg.svd(self.samplingEngine.RPOD,
full_matrices = False)
pMat = self.samplingEngine.samples.dot(U[:, : self.R])
else:
pMat = self.samplingEngine.samples[:, : self.R]
if self.trainedModel is None:
self.trainedModel = tModel()
self.trainedModel.verbosity = self.verbosity
self.trainedModel.timestamp = self.timestamp
data = TrainedModelData(self.trainedModel.name(), self.mu0,
- np.copy(pMat), self.HFEngine.rescalingExp)
+ pMat, self.HFEngine.rescalingExp)
data.thetaAs = self.HFEngine.affineWeightsA(self.mu0)
data.thetabs = self.HFEngine.affineWeightsb(self.mu0,
self.homogeneized)
data.ARBs, data.bRBs = self.assembleReducedSystem(pMat)
- data.mus = np.copy(self.mus)
+ data.mus = copy(self.mus)
self.trainedModel.data = data
else:
pMatOld = self.trainedModel.data.projMat
Sold = pMatOld.shape[1]
ARBs, bRBs = self.assembleReducedSystem(pMat[:, Sold :], pMatOld)
self.trainedModel.data.ARBs = ARBs
self.trainedModel.data.bRBs = bRBs
- self.trainedModel.data.projMat = np.copy(pMat)
+ self.trainedModel.data.projMat = copy(pMat)
if self.verbosity >= 7:
verbosityDepth("DEL", "Done computing projection matrix.",
timestamp = self.timestamp)
self.trainedModel.data.approxParameters = copy(self.approxParameters)
if self.verbosity >= 5:
verbosityDepth("DEL", "Done setting up approximant.",
timestamp = self.timestamp)
def assembleReducedSystem(self, pMat : Np2D = None, pMatOld : Np2D = None)\
-> Tuple[List[Np2D], List[Np1D]]:
"""Build affine blocks of RB linear system through projections."""
if pMat is None:
self.setupApprox()
ARBs = self.trainedModel.data.ARBs
bRBs = self.trainedModel.data.bRBs
else:
if self.verbosity >= 10:
verbosityDepth("INIT", "Projecting affine terms of HF model.",
timestamp = self.timestamp)
ARBsOld = None if pMatOld is None else self.trainedModel.data.ARBs
bRBsOld = None if pMatOld is None else self.trainedModel.data.bRBs
ARBs, bRBs = projectAffineDecomposition(self.As, self.bs, pMat,
ARBsOld, bRBsOld, pMatOld)
if self.verbosity >= 10:
verbosityDepth("DEL", "Done projecting affine terms.",
timestamp = self.timestamp)
return ARBs, bRBs
diff --git a/rrompy/reduction_methods/distributed_greedy/rational_interpolant_greedy.py b/rrompy/reduction_methods/distributed_greedy/rational_interpolant_greedy.py
index 3cd83b4..70b41d2 100644
--- a/rrompy/reduction_methods/distributed_greedy/rational_interpolant_greedy.py
+++ b/rrompy/reduction_methods/distributed_greedy/rational_interpolant_greedy.py
@@ -1,555 +1,554 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
-from copy import copy
+from copy import deepcopy as copy
import numpy as np
from scipy.special import factorial as fact
from .generic_distributed_greedy_approximant import \
GenericDistributedGreedyApproximant
from rrompy.utilities.poly_fitting import (polybases, polyvander, polydomcoeff,
polyfitname, customFit)
from rrompy.reduction_methods.distributed import RationalInterpolant
from rrompy.reduction_methods.trained_model import TrainedModelPade as tModel
from rrompy.reduction_methods.trained_model import TrainedModelData
from rrompy.utilities.base.types import DictAny, List, HFEng
from rrompy.utilities.base import purgeDict, verbosityDepth
from rrompy.utilities.exception_manager import RROMPyWarning
from rrompy.utilities.exception_manager import RROMPyException
__all__ = ['RationalInterpolantGreedy']
class RationalInterpolantGreedy(GenericDistributedGreedyApproximant,
RationalInterpolant):
"""
ROM greedy rational interpolant computation for parametric problems.
Args:
HFEngine: HF problem solver.
mu0(optional): Default parameter. Defaults to 0.
approxParameters(optional): Dictionary containing values for main
parameters of approximant. Recognized keys are:
- 'POD': whether to compute POD of snapshots; defaults to True;
- 'muBounds': list of bounds for parameter values; defaults to
[0, 1];
- 'S': number of starting training points; defaults to 2;
- 'sampler': sample point generator; defaults to uniform sampler on
muBounds;
- 'basis': type of basis for interpolation; allowed values include
'MONOMIAL', 'CHEBYSHEV' and 'LEGENDRE'; defaults to 'MONOMIAL';
- 'Delta': difference between M and N in rational approximant;
defaults to 0;
- 'greedyTol': uniform error tolerance for greedy algorithm;
defaults to 1e-2;
- 'errorEstimatorKind': kind of error estimator; available values
include 'EXACT', 'SIMPLIFIED', 'BASIC', and 'BARE'; defaults to
'SIMPLIFIED';
- 'maxIter': maximum number of greedy steps; defaults to 1e2;
- 'refinementRatio': ratio of training points to be exhausted
before training set refinement; defaults to 0.2;
- 'nTestPoints': number of test points; defaults to maxIter /
refinementRatio;
- 'trainSetGenerator': training sample points generator; defaults
to Chebyshev sampler within muBounds;
- 'interpRcond': tolerance for interpolation via numpy.polyfit;
defaults to None;
- 'robustTol': tolerance for robust Pade' denominator management;
defaults to 0.
Defaults to empty dict.
homogeneized(optional): Whether to homogeneize Dirichlet BCs. Defaults
to False.
verbosity(optional): Verbosity level. Defaults to 10.
Attributes:
HFEngine: HF problem solver.
mu0: Default parameter.
mus: Array of snapshot parameters.
homogeneized: Whether to homogeneize Dirichlet BCs.
approxParameters: Dictionary containing values for main parameters of
approximant. Recognized keys are in parameterList.
parameterList: Recognized keys of approximant parameters:
- 'POD': whether to compute POD of snapshots;
- 'muBounds': list of bounds for parameter values;
- 'S': number of starting training points;
- 'sampler': sample point generator;
- 'basis': type of basis for interpolation;
- 'Delta': difference between M and N in rational approximant;
- 'greedyTol': uniform error tolerance for greedy algorithm;
- 'errorEstimatorKind': kind of error estimator;
- 'maxIter': maximum number of greedy steps;
- 'refinementRatio': ratio of training points to be exhausted
before training set refinement;
- 'nTestPoints': number of test points;
- 'trainSetGenerator': training sample points generator;
- 'interpRcond': tolerance for interpolation via numpy.polyfit;
- 'robustTol': tolerance for robust Pade' denominator management.
extraApproxParameters: List of approxParameters keys in addition to
mother class's.
POD: whether to compute POD of snapshots.
muBounds: list of bounds for parameter values.
S: number of starting training points.
sampler: Sample point generator.
greedyTol: uniform error tolerance for greedy algorithm.
errorEstimatorKind: kind of error estimator.
maxIter: maximum number of greedy steps.
refinementRatio: ratio of training points to be exhausted before
training set refinement.
nTestPoints: number of starting training points.
trainSetGenerator: training sample points generator.
interpRcond: Tolerance for interpolation via numpy.polyfit.
robustTol: Tolerance for robust Pade' denominator management.
estimatorEnergyMatrix: matrix representing inner product for error
estimation.
samplingEngine: Sampling engine.
uHF: High fidelity solution with wavenumber lastSolvedHF as numpy
complex vector.
lastSolvedHF: Wavenumber corresponding to last computed high fidelity
solution.
uApp: Last evaluated approximant as numpy complex vector.
"""
_allowedEstimatorKinds = ["EXACT", "SIMPLIFIED", "BASIC", "BARE"]
def __init__(self, HFEngine:HFEng, mu0 : complex = 0.,
approxParameters : DictAny = {}, homogeneized : bool = False,
verbosity : int = 10, timestamp : bool = True):
self._preInit()
self._addParametersToList(["polybasis", "Delta", "errorEstimatorKind",
"interpRcond", "robustTol"])
super().__init__(HFEngine = HFEngine, mu0 = mu0,
approxParameters = approxParameters,
homogeneized = homogeneized,
verbosity = verbosity, timestamp = timestamp)
if self.verbosity >= 7:
verbosityDepth("INIT", "Computing Taylor blocks of system.",
timestamp = self.timestamp)
nAs = self.HFEngine.nAs - 1
nbs = max(self.HFEngine.nbs, (nAs + 1) * self.homogeneized)
self.As = [self.HFEngine.A(self.mu0, j + 1) for j in range(nAs)]
self.bs = [self.HFEngine.b(self.mu0, j, self.homogeneized)
for j in range(nbs)]
if self.verbosity >= 7:
verbosityDepth("DEL", "Done computing Taylor blocks.",
timestamp = self.timestamp)
self._postInit()
@property
def approxParameters(self):
"""
Value of approximant parameters. Its assignment may change robustTol.
"""
return self._approxParameters
@approxParameters.setter
def approxParameters(self, approxParams):
approxParameters = purgeDict(approxParams, self.parameterList,
dictname = self.name() + ".approxParameters",
baselevel = 1)
approxParametersCopy = purgeDict(approxParameters, ["polybasis",
"Delta",
"errorEstimatorKind",
"interpRcond",
"robustTol"],
True, True, baselevel = 1)
if "Delta" in list(approxParameters.keys()):
self._Delta = approxParameters["Delta"]
elif not hasattr(self, "_Delta") or self._Delta is None:
self._Delta = 0
GenericDistributedGreedyApproximant.approxParameters.fset(self,
approxParametersCopy)
keyList = list(approxParameters.keys())
self.Delta = self.Delta
if "polybasis" in keyList:
self.polybasis = approxParameters["polybasis"]
elif not hasattr(self, "_polybasis") or self._polybasis is None:
self.polybasis = "MONOMIAL"
if "errorEstimatorKind" in keyList:
self.errorEstimatorKind = approxParameters["errorEstimatorKind"]
elif (not hasattr(self, "_errorEstimatorKind")
or self.errorEstimatorKind is None):
self.errorEstimatorKind = "SIMPLIFIED"
if "interpRcond" in keyList:
self.interpRcond = approxParameters["interpRcond"]
elif not hasattr(self, "interpRcond") or self.interpRcond is None:
self.interpRcond = None
if "robustTol" in keyList:
self.robustTol = approxParameters["robustTol"]
elif not hasattr(self, "_robustTol") or self._robustTol is None:
self.robustTol = 0
@property
def polybasis(self):
"""Value of polybasis."""
return self._polybasis
@polybasis.setter
def polybasis(self, polybasis):
try:
polybasis = polybasis.upper().strip().replace(" ","")
if polybasis not in polybases:
raise RROMPyException("Sample type not recognized.")
self._polybasis = polybasis
except:
RROMPyWarning(("Prescribed polybasis not recognized. Overriding "
"to 'MONOMIAL'."))
self._polybasis = "MONOMIAL"
self._approxParameters["polybasis"] = self.polybasis
@property
def Delta(self):
"""Value of Delta."""
return self._Delta
@Delta.setter
def Delta(self, Delta):
if not np.isclose(Delta, np.floor(Delta)):
raise RROMPyException("Delta must be an integer.")
if Delta < 0:
RROMPyWarning(("Error estimator unreliable for Delta < 0. "
"Overloading of errorEstimator is suggested."))
else:
Deltamin = (max(self.HFEngine.nbs,
self.HFEngine.nAs * self.homogeneized)
- 1 - 1 * (self.HFEngine.nAs > 1))
if Delta < Deltamin:
RROMPyWarning(("Method may be unreliable for selected Delta. "
"Suggested minimal value of Delta: {}.").format(
Deltamin))
self._Delta = Delta
self._approxParameters["Delta"] = self.Delta
@property
def errorEstimatorKind(self):
"""Value of errorEstimatorKind."""
return self._errorEstimatorKind
@errorEstimatorKind.setter
def errorEstimatorKind(self, errorEstimatorKind):
errorEstimatorKind = errorEstimatorKind.upper()
if errorEstimatorKind not in self._allowedEstimatorKinds:
RROMPyWarning(("Error estimator kind not recognized. Overriding "
"to 'SIMPLIFIED'."))
errorEstimatorKind = "SIMPLIFIED"
self._errorEstimatorKind = errorEstimatorKind
self._approxParameters["errorEstimatorKind"] = self.errorEstimatorKind
@property
def nTestPoints(self):
"""Value of nTestPoints."""
return self._nTestPoints
@nTestPoints.setter
def nTestPoints(self, nTestPoints):
if nTestPoints <= np.abs(self.Delta):
RROMPyWarning(("nTestPoints must be at least abs(Delta) + 1. "
"Increasing value to abs(Delta) + 1."))
nTestPoints = np.abs(self.Delta) + 1
if not np.isclose(nTestPoints, np.int(nTestPoints)):
raise RROMPyException("nTestPoints must be an integer.")
nTestPoints = np.int(nTestPoints)
if hasattr(self, "_nTestPoints") and self.nTestPoints is not None:
nTestPointsold = self.nTestPoints
else: nTestPointsold = -1
self._nTestPoints = nTestPoints
self._approxParameters["nTestPoints"] = self.nTestPoints
if nTestPointsold != self.nTestPoints:
self.resetSamples()
def errorEstimator(self, mus:List[np.complex]) -> List[np.complex]:
"""Standard residual-based error estimator."""
self.setupApprox()
PM = self.trainedModel.data.P[:, -1]
if np.any(np.isnan(PM)) or np.any(np.isinf(PM)):
err = np.empty(len(mus))
err[:] = np.inf
return err
nAs = self.HFEngine.nAs - 1
nbs = max(self.HFEngine.nbs - 1, nAs * self.homogeneized)
S = len(self.mus)
muRTest = self.radiusPade(mus)
muRTrain = self.radiusPade(self.mus)
nodalVals = np.prod(np.tile(muRTest.reshape(-1, 1), [1, S])
- muRTrain.reshape(1, -1), axis = 1)
denVals = self.trainedModel.getQVal(mus)
self.assembleReducedResidualBlocks(kind = self.errorEstimatorKind)
vanderBase = np.polynomial.polynomial.polyvander(muRTest,
max(nAs, nbs)).T
radiusb0 = vanderBase[: nbs + 1, :]
# 'ij,jk,ik->k', resbb, radiusb0, radiusb0.conj()
b0resb0 = np.sum(self.trainedModel.data.resbb.dot(radiusb0)
* radiusb0.conj(), axis = 0)
RHSnorms = np.power(np.abs(b0resb0), .5)
if self.errorEstimatorKind == "BARE":
self.assembleReducedResidualGramian(self.trainedModel.data.projMat)
pDom = self.trainedModel.data.P[:, -1]
LL = pDom.conj().dot(self.trainedModel.data.gramian.dot(pDom))
Adiag = self.As[0].diagonal()
LL = ((self.scaleFactor * np.linalg.norm(Adiag)) ** 2.
/ np.size(Adiag) * LL)
elif self.errorEstimatorKind == "BASIC":
pDom = self.trainedModel.data.P[:, -1]
LL = pDom.conj().dot(self.trainedModel.data.resAA.dot(pDom))
else:
vanderBase = vanderBase[: -1, :]
delta = S - len(self.trainedModel.data.Q)
nbsEff = max(0, nbs - delta)
if self.errorEstimatorKind == "SIMPLIFIED":
radiusA = np.tensordot(PM, vanderBase[: nAs, :], 0)
if delta == 0:
radiusb = (np.abs(self.trainedModel.data.Q[-1])
* radiusb0[: -1, :])
else: #if self.errorEstimatorKind == "EXACT":
momentQ = np.zeros(nbsEff, dtype = np.complex)
momentQu = np.zeros((S, nAs), dtype = np.complex)
radiusbTen = np.zeros((nbsEff, nbsEff, len(mus)),
dtype = np.complex)
radiusATen = np.zeros((nAs, nAs, len(mus)), dtype = np.complex)
if nbsEff > 0:
momentQ[0] = self.trainedModel.data.Q[-1]
radiusbTen[0, :, :] = vanderBase[: nbsEff, :]
momentQu[:, 0] = self.trainedModel.data.P[:, -1]
radiusATen[0, :, :] = vanderBase[: nAs, :]
Qvals = self.trainedModel.getQVal(self.mus)
for k in range(1, max(nAs, nbs * (nbsEff > 0))):
Qvals = Qvals * muRTrain
if k > delta and k < nbs:
momentQ[k - delta] = self._fitinv.dot(Qvals)
radiusbTen[k - delta, k :, :] = (
radiusbTen[0, : delta - k, :])
if k < nAs:
momentQu[:, k] = Qvals * self._fitinv
radiusATen[k, k :, :] = radiusATen[0, : - k, :]
if self.POD and nAs > 1:
momentQu[:, 1 :] = self.samplingEngine.RPOD.dot(
momentQu[:, 1 :])
radiusA = np.tensordot(momentQu, radiusATen, 1)
if nbsEff > 0:
radiusb = np.tensordot(momentQ, radiusbTen, 1)
if ((self.errorEstimatorKind == "SIMPLIFIED" and delta == 0)
or (self.errorEstimatorKind == "EXACT" and nbsEff > 0)):
# 'ij,jk,ik->k', resbb, radiusb, radiusb.conj()
ff = np.sum(self.trainedModel.data.resbb[delta + 1 :, delta + 1 :]\
.dot(radiusb) * radiusb.conj(), axis = 0)
# 'ijk,jkl,il->l', resAb, radiusA, radiusb.conj()
Lf = np.sum(np.tensordot(
self.trainedModel.data.resAb[delta :, :, :], radiusA, 2)
* radiusb.conj(), axis = 0)
else:
ff, Lf = 0., 0.
if self.errorEstimatorKind not in ["BARE", "BASIC"]:
# 'ijkl,klt,ijt->t', resAA, radiusA, radiusA.conj()
LL = np.sum(np.tensordot(self.trainedModel.data.resAA, radiusA, 2)
* radiusA.conj(), axis = (0, 1))
jOpt = np.power(np.abs(ff - 2. * np.real(Lf) + LL), .5)
return (polydomcoeff[self.polybasis](S - 1) * jOpt
* np.abs(nodalVals / denVals) / RHSnorms)
def _setupDenominator(self):
"""Compute Pade' denominator."""
if self.verbosity >= 7:
verbosityDepth("INIT", "Starting computation of denominator.",
timestamp = self.timestamp)
S = len(self.mus)
TS = polyvander[self.polybasis](self.radiusPade(self.mus), S - 1).T
RHS = np.zeros(S)
RHS[-1] = 1.
fitOut = customFit(TS, RHS, full = True, rcond = self.interpRcond)
if self.verbosity >= 2:
condfit = fitOut[1][2][0] / fitOut[1][2][-1]
verbosityDepth("MAIN", ("Fitting {} samples with degree {} "
"through {}... Conditioning of system: "
"{:.4e}.").format(S, S - 1,
polyfitname[self.polybasis],
condfit),
timestamp = self.timestamp)
if fitOut[1][1] < S:
RROMPyWarning(("Polyfit is poorly conditioned. Starting "
"preemptive termination of computation of "
"approximant."))
Q = np.empty(max(self.N, 0) + 1, dtype = np.complex)
P = np.empty((len(self.mus), max(self.M, 0) + 1),
dtype = np.complex)
Q[:] = np.nan
P[:] = np.nan
- self.trainedModel.data.Q = np.copy(Q)
- self.trainedModel.data.P = np.copy(P)
+ self.trainedModel.data.Q = copy(Q)
+ self.trainedModel.data.P = copy(P)
self.trainedModel.data.approxParameters = copy(
self.approxParameters)
if self.verbosity >= 7:
verbosityDepth("DEL", "Aborting computation of denominator.",
timestamp = self.timestamp)
return
self._fitinv = fitOut[0]
while self.N > 0:
Ghalf = (TS[: self.N + 1, :] * self._fitinv).T
if self.POD:
self.Ghalf = self.samplingEngine.RPOD.dot(Ghalf)
ev, eV = self.findeveVGQR(2)
else:
self.Ghalf = self.samplingEngine.samples.dot(Ghalf)
ev, eV = self.findeveVGQR(2)
Nstable = np.sum(np.abs(ev) >= self.robustTol * np.linalg.norm(ev))
if self.N <= Nstable: break
if self.verbosity >= 2:
verbosityDepth("MAIN", ("Smallest {} eigenvalues below "
"tolerance. Reducing N to {}.")\
.format(self.N - Nstable + 1, Nstable),
timestamp = self.timestamp)
self._N = Nstable
if self.N <= 0:
self._N = 0
eV = np.ones((1, 1))
if self.verbosity >= 7:
verbosityDepth("DEL", "Done computing denominator.",
timestamp = self.timestamp)
return eV[:, 0]
def _setupNumerator(self):
"""Compute Pade' numerator."""
if self.verbosity >= 7:
verbosityDepth("INIT", "Starting computation of numerator.",
timestamp = self.timestamp)
Qevaldiag = np.diag(self.trainedModel.getQVal(self.mus))
verb = self.trainedModel.verbosity
self.trainedModel.verbosity = 0
mus_un, idx_un, cnt_un = np.unique(self.mus, return_inverse = True,
return_counts = True)
for j in range(len(mus_un)):
if cnt_un[j] > 1:
Q_loc = np.zeros((cnt_un[j], cnt_un[j]), dtype = np.complex)
for der in range(1, cnt_un[j]):
Qderj = (self.trainedModel.getQVal(mus_un[j], der)
/ fact(der))
Q_loc = Q_loc + Qderj * np.diag(np.ones(cnt_un[j] - der),
k = - der)
I = idx_un == j
I = np.arange(len(self.mus))[I]
I = np.ix_(I, I)
Qevaldiag[I] = Qevaldiag[I] + Q_loc
self.trainedModel.verbosity = verb
while self.M >= 0:
fitVander = polyvander[self.polybasis](self.radiusPade(self.mus),
self.M)
w = None
S = len(self.mus)
if self.M == S - 1: w = "AUTO"
fitOut = customFit(fitVander, Qevaldiag, full = True, w = w,
rcond = self.interpRcond)
if self.verbosity >= 2:
condfit = fitOut[1][2][0] / fitOut[1][2][-1]
verbosityDepth("MAIN", ("Fitting {} samples with degree {} "
"through {}... Conditioning of "
"system: {:.4e}.").format(
S, self.M,
polyfitname[self.polybasis],
condfit),
timestamp = self.timestamp)
if fitOut[1][1] == self.M + 1:
P = fitOut[0].T
break
RROMPyWarning(("Polyfit is poorly conditioned. Reducing M from {} "
"to {}. Exact snapshot interpolation not "
"guaranteed.").format(self.M, fitOut[1][1] - 1))
self._M = fitOut[1][1] - 1
if self.M < 0:
raise RROMPyException(("Instability in computation of numerator. "
"Aborting."))
return np.atleast_2d(P)
def setupApprox(self, plotEst : bool = False):
"""
Compute Pade' interpolant.
SVD-based robust eigenvalue management.
"""
if self.checkComputedApprox():
return
if self.verbosity >= 5:
verbosityDepth("INIT", "Setting up {}.". format(self.name()),
timestamp = self.timestamp)
self.computeScaleFactor()
self.greedy(plotEst)
S = len(self.mus)
self._M = S - 1
self._N = S - 1
if self.Delta < 0:
self._M += self.Delta
else:
self._N -= self.Delta
if self.trainedModel is None:
self.trainedModel = tModel()
self.trainedModel.verbosity = self.verbosity
self.trainedModel.timestamp = self.timestamp
data = TrainedModelData(self.trainedModel.name(), self.mu0,
- np.copy(self.samplingEngine.samples),
+ self.samplingEngine.samples,
self.HFEngine.rescalingExp)
data.polytype = self.polybasis
data.scaleFactor = self.scaleFactor
- data.mus = np.copy(self.mus)
+ data.mus = copy(self.mus)
self.trainedModel.data = data
else:
- self.trainedModel.data.projMat = np.copy(
- self.samplingEngine.samples)
- self.trainedModel.data.mus = np.copy(self.mus)
+ self.trainedModel.data.projMat = copy(self.samplingEngine.samples)
+ self.trainedModel.data.mus = copy(self.mus)
if min(self.M, self.N) < 0:
if self.verbosity >= 5:
verbosityDepth("MAIN", "Minimal sample size not achieved.",
timestamp = self.timestamp)
Q = np.empty(max(self.N, 0) + 1, dtype = np.complex)
P = np.empty((len(self.mus), max(self.M, 0) + 1),
dtype = np.complex)
Q[:] = np.nan
P[:] = np.nan
- self.trainedModel.data.Q = np.copy(Q)
- self.trainedModel.data.P = np.copy(P)
+ self.trainedModel.data.Q = copy(Q)
+ self.trainedModel.data.P = copy(P)
self.trainedModel.data.approxParameters = copy(
self.approxParameters)
if self.verbosity >= 5:
verbosityDepth("DEL", "Aborting computation of approximant.",
timestamp = self.timestamp)
return
if self.N > 0:
Q = self._setupDenominator()
if Q is None:
if self.verbosity >= 5:
verbosityDepth("DEL",
"Aborting computation of approximant.",
timestamp = self.timestamp)
return
else:
Q = np.ones((1,), dtype = np.complex)
- self.trainedModel.data.Q = np.copy(Q)
+ self.trainedModel.data.Q = copy(Q)
P = self._setupNumerator()
if self.POD:
P = self.samplingEngine.RPOD.dot(P)
- self.trainedModel.data.P = np.copy(P)
+ self.trainedModel.data.P = copy(P)
if self.verbosity >= 7:
verbosityDepth("DEL", "Done computing numerator.",
timestamp = self.timestamp)
self.trainedModel.data.approxParameters = copy(self.approxParameters)
if self.verbosity >= 5:
verbosityDepth("DEL", "Done setting up approximant.",
timestamp = self.timestamp)
def assembleReducedResidualBlocks(self, kind : str = "EXACT"):
"""Build affine blocks of reduced linear system through projections."""
pMat = self.trainedModel.data.projMat
scaling = self.trainedModel.data.scaleFactor
self.assembleReducedResidualBlocksbb(self.bs, pMat, scaling)
if kind in ["EXACT", "SIMPLIFIED"]:
self.assembleReducedResidualBlocksAb(self.As, self.bs[1 :],
pMat, scaling)
if kind != "BARE":
self.assembleReducedResidualBlocksAA(self.As, pMat, scaling,
basic = (kind == "BASIC"))
diff --git a/rrompy/reduction_methods/distributed_greedy/rb_distributed_greedy.py b/rrompy/reduction_methods/distributed_greedy/rb_distributed_greedy.py
index 54fead4..ff46ba2 100644
--- a/rrompy/reduction_methods/distributed_greedy/rb_distributed_greedy.py
+++ b/rrompy/reduction_methods/distributed_greedy/rb_distributed_greedy.py
@@ -1,250 +1,250 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
import numpy as np
-from copy import copy
+from copy import deepcopy as copy
from .generic_distributed_greedy_approximant import \
GenericDistributedGreedyApproximant
from rrompy.reduction_methods.distributed import RBDistributed
from rrompy.reduction_methods.trained_model import TrainedModelRB as tModel
from rrompy.reduction_methods.trained_model import TrainedModelData
from rrompy.utilities.base.types import DictAny, HFEng, List
from rrompy.utilities.base import verbosityDepth
from rrompy.utilities.exception_manager import RROMPyException
__all__ = ['RBDistributedGreedy']
class RBDistributedGreedy(GenericDistributedGreedyApproximant, RBDistributed):
"""
ROM greedy RB approximant computation for parametric problems.
Args:
HFEngine: HF problem solver.
mu0(optional): Default parameter. Defaults to 0.
approxParameters(optional): Dictionary containing values for main
parameters of approximant. Recognized keys are:
- 'POD': whether to compute POD of snapshots; defaults to True;
- 'muBounds': list of bounds for parameter values; defaults to
[0, 1];
- 'S': number of starting training points; defaults to 2;
- 'sampler': sample point generator; defaults to uniform sampler on
muBounds;
- 'greedyTol': uniform error tolerance for greedy algorithm;
defaults to 1e-2;
- 'maxIter': maximum number of greedy steps; defaults to 1e2;
- 'refinementRatio': ratio of training points to be exhausted
before training set refinement; defaults to 0.2;
- 'nTestPoints': number of test points; defaults to maxIter /
refinementRatio;
- 'trainSetGenerator': training sample points generator; defaults
to Chebyshev sampler within muBounds.
Defaults to empty dict.
homogeneized(optional): Whether to homogeneize Dirichlet BCs. Defaults
to False.
verbosity(optional): Verbosity level. Defaults to 10.
Attributes:
HFEngine: HF problem solver.
mu0: Default parameter.
mus: Array of snapshot parameters.
homogeneized: Whether to homogeneize Dirichlet BCs.
approxParameters: Dictionary containing values for main parameters of
approximant. Recognized keys are in parameterList.
parameterList: Recognized keys of approximant parameters:
- 'POD': whether to compute POD of snapshots;
- 'muBounds': list of bounds for parameter values;
- 'S': number of starting training points;
- 'sampler': sample point generator;
- 'greedyTol': uniform error tolerance for greedy algorithm;
- 'maxIter': maximum number of greedy steps;
- 'refinementRatio': ratio of training points to be exhausted
before training set refinement;
- 'nTestPoints': number of test points;
- 'trainSetGenerator': training sample points generator.
extraApproxParameters: List of approxParameters keys in addition to
mother class's.
POD: whether to compute POD of snapshots.
muBounds: list of bounds for parameter values.
S: number of test points.
sampler: Sample point generator.
greedyTol: uniform error tolerance for greedy algorithm.
maxIter: maximum number of greedy steps.
refinementRatio: ratio of training points to be exhausted before
training set refinement.
nTestPoints: number of starting training points.
trainSetGenerator: training sample points generator.
estimatorEnergyMatrix: matrix representing inner product for error
estimation.
samplingEngine: Sampling engine.
uHF: High fidelity solution with wavenumber lastSolvedHF as numpy
complex vector.
lastSolvedHF: Wavenumber corresponding to last computed high fidelity
solution.
uApp: Last evaluated approximant as numpy complex vector.
As: List of sparse matrices (in CSC format) representing coefficients
of linear system matrix wrt theta(mu).
bs: List of numpy vectors representing coefficients of linear system
RHS wrt theta(mu).
thetaAs: List of callables representing coefficients of linear system
matrix wrt mu.
thetabs: List of callables representing coefficients of linear system
RHS wrt mu.
ARBs: List of sparse matrices (in CSC format) representing coefficients
of compressed linear system matrix wrt theta(mu).
bRBs: List of numpy vectors representing coefficients of compressed
linear system RHS wrt theta(mu).
"""
def __init__(self, HFEngine:HFEng, mu0 : complex = 0.,
approxParameters : DictAny = {}, homogeneized : bool = False,
verbosity : int = 10, timestamp : bool = True):
self._preInit()
super().__init__(HFEngine = HFEngine, mu0 = mu0,
approxParameters = approxParameters,
homogeneized = homogeneized,
verbosity = verbosity, timestamp = timestamp)
if self.verbosity >= 10:
verbosityDepth("INIT", "Computing affine blocks of system.",
timestamp = self.timestamp)
self.As = self.HFEngine.affineLinearSystemA(self.mu0)
self.bs = self.HFEngine.affineLinearSystemb(self.mu0,
self.homogeneized)
if self.verbosity >= 10:
verbosityDepth("DEL", "Done computing affine blocks.",
timestamp = self.timestamp)
self._postInit()
@property
def R(self):
"""Value of R."""
return self._S
@R.setter
def R(self, R):
raise RROMPyException(("R is used just to simplify inheritance, and "
"its value cannot be changed from that of S."))
def errorEstimator(self, mus:List[np.complex]) -> List[np.complex]:
"""
Standard residual-based error estimator. Unreliable for unstable
problems (inf-sup constant is missing).
"""
self.setupApprox()
self.assembleReducedResidualBlocks()
nmus = len(mus)
nAs = self.trainedModel.data.resAA.shape[1]
nbs = self.trainedModel.data.resbb.shape[0]
thetaAs = self.trainedModel.data.thetaAs
thetabs = self.trainedModel.data.thetabs
radiusA = np.empty((len(self.mus), nAs, nmus), dtype = np.complex)
radiusb = np.empty((nbs, nmus), dtype = np.complex)
verb = self.trainedModel.verbosity
self.trainedModel.verbosity = 0
if verb >= 5:
mustr = mus
if nmus > 2:
mustr = "[{} ..({}).. {}]".format(mus[0], nmus - 2,
mus[-1])
verbosityDepth("INIT", ("Computing RB solution at mu = "
"{}.").format(mustr),
timestamp = self.timestamp)
for j in range(nmus):
mu = mus[j]
uApp = self.getApproxReduced(mu)
for i in range(nAs):
radiusA[:, i, j] = eval(thetaAs[i]) * uApp
for i in range(nbs):
radiusb[i, j] = eval(thetabs[i])
if verb >= 5:
verbosityDepth("DEL", "Done computing RB solution.",
timestamp = self.timestamp)
self.trainedModel.verbosity = verb
# 'ij,jk,ik->k', resbb, radiusb, radiusb.conj()
ff = np.sum(self.trainedModel.data.resbb.dot(radiusb) * radiusb.conj(),
axis = 0)
# 'ijk,jkl,il->l', resAb, radiusA, radiusb.conj()
Lf = np.sum(np.tensordot(self.trainedModel.data.resAb, radiusA, 2)
* radiusb.conj(), axis = 0)
# 'ijkl,klt,ijt->t', resAA, radiusA, radiusA.conj()
LL = np.sum(np.tensordot(self.trainedModel.data.resAA, radiusA, 2)
* radiusA.conj(), axis = (0, 1))
return np.abs((LL - 2. * np.real(Lf) + ff) / ff) ** .5
def setupApprox(self, plotEst : bool = False):
"""Compute RB projection matrix."""
if self.checkComputedApprox():
return
if self.verbosity >= 5:
verbosityDepth("INIT", "Setting up {}.". format(self.name()),
timestamp = self.timestamp)
self.greedy(plotEst)
if self.verbosity >= 7:
verbosityDepth("INIT", "Computing projection matrix.",
timestamp = self.timestamp)
pMat = self.samplingEngine.samples
if self.trainedModel is None:
self.trainedModel = tModel()
self.trainedModel.verbosity = self.verbosity
self.trainedModel.timestamp = self.timestamp
data = TrainedModelData(self.trainedModel.name(), self.mu0,
- np.copy(pMat), self.HFEngine.rescalingExp)
+ pMat, self.HFEngine.rescalingExp)
data.thetaAs = self.HFEngine.affineWeightsA(self.mu0)
data.thetabs = self.HFEngine.affineWeightsb(self.mu0,
self.homogeneized)
data.ARBs, data.bRBs = self.assembleReducedSystem(pMat)
- data.mus = np.copy(self.mus)
+ data.mus = copy(self.mus)
self.trainedModel.data = data
else:
pMatOld = self.trainedModel.data.projMat
Sold = pMatOld.shape[1]
ARBs, bRBs = self.assembleReducedSystem(pMat[:, Sold :], pMatOld)
self.trainedModel.data.ARBs = ARBs
self.trainedModel.data.bRBs = bRBs
- self.trainedModel.data.projMat = np.copy(pMat)
- self.trainedModel.data.mus = np.copy(self.mus)
+ self.trainedModel.data.projMat = copy(pMat)
+ self.trainedModel.data.mus = copy(self.mus)
if self.verbosity >= 7:
verbosityDepth("DEL", "Done computing projection matrix.",
timestamp = self.timestamp)
self.trainedModel.data.approxParameters = copy(self.approxParameters)
if self.verbosity >= 5:
verbosityDepth("DEL", "Done setting up approximant.",
timestamp = self.timestamp)
def assembleReducedResidualBlocks(self):
"""Build affine blocks of RB linear system through projections."""
computeResbb = not hasattr(self.trainedModel.data, "resbb")
computeResAb = (not hasattr(self.trainedModel.data, "resAb")
or self.trainedModel.data.resAb.shape[1] != len(self.mus))
computeResAA = (not hasattr(self.trainedModel.data, "resAA")
or self.trainedModel.data.resAA.shape[0] != len(self.mus))
if computeResbb or computeResAb or computeResAA:
pMat = self.trainedModel.data.projMat
if self.verbosity >= 7:
verbosityDepth("INIT", "Projecting affine terms of residual.",
timestamp = self.timestamp)
if computeResbb:
self.assembleReducedResidualBlocksbb(self.bs, pMat)
if computeResAb:
self.assembleReducedResidualBlocksAb(self.As, self.bs, pMat)
if computeResAA:
self.assembleReducedResidualBlocksAA(self.As, pMat)
if self.verbosity >= 7:
verbosityDepth("DEL", ("Done setting up affine decomposition "
"of residual."),
timestamp = self.timestamp)
diff --git a/rrompy/reduction_methods/trained_model/trained_model_data.py b/rrompy/reduction_methods/trained_model/trained_model_data.py
index 82cd8fd..817e3f1 100644
--- a/rrompy/reduction_methods/trained_model/trained_model_data.py
+++ b/rrompy/reduction_methods/trained_model/trained_model_data.py
@@ -1,31 +1,32 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
+from copy import deepcopy as copy
from rrompy.utilities.base.types import Np2D
__all__ = ['TrainedModelData']
class TrainedModelData:
"""ROM approximant evaluation data (must be pickle-able)."""
def __init__(self, name:str, mu0:complex, projMat:Np2D,
rescalingExp : float = 1.):
self.name = name
self.mu0 = mu0
- self.projMat = projMat
+ self.projMat = copy(projMat)
self.rescalingExp = rescalingExp
diff --git a/rrompy/sampling/base/pod_engine.py b/rrompy/sampling/base/pod_engine.py
index ece593a..924d291 100644
--- a/rrompy/sampling/base/pod_engine.py
+++ b/rrompy/sampling/base/pod_engine.py
@@ -1,151 +1,151 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
import numpy as np
-from copy import copy
+from copy import deepcopy as copy
from rrompy.utilities.base.types import Np1D, Np2D, Tuple, HFEng
from rrompy.utilities.exception_manager import RROMPyException
__all__ = ['PODEngine']
class PODEngine:
"""
POD engine for general matrix orthogonalization.
"""
def __init__(self, HFEngine:HFEng):
self.HFEngine = HFEngine
def name(self) -> str:
return self.__class__.__name__
def __str__(self) -> str:
return self.name()
def __repr__(self) -> str:
return self.__str__() + " at " + hex(id(self))
def norm(self, a:Np1D) -> float:
"""Compute norm of a Hilbert space object."""
pass
def GS(self, a:Np1D, Q:Np2D, n : int = None,
aA:Np1D = None, QA:Np2D = None) -> Tuple[Np1D, Np1D, Np1D]:
"""
Compute 1 Gram-Schmidt step with given projector.
Args:
a: vector to be projected;
Q: orthogonal projection matrix;
n: number of columns of Q to be considered;
aA: augmented components of vector to be projected;
QA: augmented components of projection matrix.
Returns:
Resulting normalized vector, coefficients of a wrt the updated
basis.
"""
if n is None:
n = Q.shape[1]
if aA is None != QA is None:
raise RROMPyException(("Either both or none of augmented "
"components must be provided."))
r = np.zeros((n + 1,), dtype = a.dtype)
if n > 0:
Q = Q[:, : n]
for j in range(2): # twice is enough!
nu = self.HFEngine.innerProduct(a, Q)
a = a - Q.dot(nu)
if aA is not None:
aA = aA - QA.dot(nu)
r[:-1] = r[:-1] + nu
r[-1] = self.HFEngine.norm(a)
if np.isclose(np.abs(r[-1]), 0.):
r[-1] = 1.
a = a / r[-1]
if aA is not None:
aA = aA / r[-1]
return a, r, aA
def QRGramSchmidt(self, A:Np2D,
only_R : bool = False) -> Tuple[Np1D, Np1D]:
"""
Compute QR decomposition of a matrix through Gram-Schmidt method.
Args:
A: matrix to be decomposed;
only_R(optional): whether to skip reconstruction of Q; defaults to
False.
Returns:
Resulting orthogonal and upper-triangular factors.
"""
N = A.shape[1]
Q = np.zeros_like(A, dtype = A.dtype)
R = np.zeros((N, N), dtype = A.dtype)
for k in range(N):
Q[:, k], R[: k + 1, k], _ = self.GS(A[:, k], Q, k)
if only_R:
return R
return Q, R
def QRHouseholder(self, A:Np2D, Q0 : Np2D = None,
only_R : bool = False) -> Tuple[Np1D, Np1D]:
"""
Compute QR decomposition of a matrix through Householder method.
Args:
A: matrix to be decomposed;
Q0(optional): initial orthogonal guess for Q; defaults to random;
only_R(optional): whether to skip reconstruction of Q; defaults to
False.
Returns:
Resulting (orthogonal and )upper-triangular factor(s).
"""
B = copy(A)
N = B.shape[1]
V = np.zeros_like(B, dtype = B.dtype)
R = np.zeros((N, N), dtype = B.dtype)
if Q0 is None:
Q = np.zeros_like(B, dtype = B.dtype) + np.random.randn(*(B.shape))
else:
Q = copy(Q0)
for k in range(N):
if Q0 is None:
Q[:, k], _, _ = self.GS(Q[:, k], Q, k)
a = B[:, k]
R[k, k] = self.HFEngine.norm(a)
alpha = self.HFEngine.innerProduct(a, Q[:, k])
if np.isclose(np.abs(alpha), 0.): s = 1.
else: s = - alpha / np.abs(alpha)
Q[:, k] = s * Q[:, k]
V[:, k], _, _ = self.GS(R[k, k] * Q[:, k] - a, Q, k)
J = np.arange(k + 1, N)
vtB = self.HFEngine.innerProduct(B[:, J], V[:, k])
B[:, J] = B[:, J] - 2 * np.outer(V[:, k], vtB)
R[k, J] = self.HFEngine.innerProduct(B[:, J], Q[:, k])
B[:, J] = B[:, J] - np.outer(Q[:, k], R[k, J])
if only_R:
return R
for k in range(N - 1, -1, -1):
J = np.arange(k, N)
vtQ = self.HFEngine.innerProduct(Q[:, J], V[:, k])
Q[:, J] = Q[:, J] - 2 * np.outer(V[:, k], vtQ)
return Q, R
diff --git a/rrompy/sampling/linear_problem/sampling_engine_arnoldi.py b/rrompy/sampling/linear_problem/sampling_engine_arnoldi.py
index a231241..9fab378 100644
--- a/rrompy/sampling/linear_problem/sampling_engine_arnoldi.py
+++ b/rrompy/sampling/linear_problem/sampling_engine_arnoldi.py
@@ -1,143 +1,143 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
-from copy import copy
+from copy import deepcopy as copy
import numpy as np
from rrompy.sampling.base.pod_engine import PODEngine
from .sampling_engine_krylov import SamplingEngineKrylov
from rrompy.utilities.base.types import Np1D
from rrompy.utilities.base import verbosityDepth
__all__ = ['SamplingEngineArnoldi']
class SamplingEngineArnoldi(SamplingEngineKrylov):
"""HERE"""
def resetHistory(self):
super().resetHistory()
self.HArnoldi = None
self.RArnoldi = None
self.RHSs = None
self.samplesAug = None
def popSample(self):
if hasattr(self, "nsamples") and self.nsamples > 1:
self.HArnoldi = self.HArnoldi[: -1, : -1]
self.RArnoldi = self.RArnoldi[: -1, : -1]
if self.nsamples > 2:
self.RHSs = self.RHSs[:, : -1]
else:
self.RHSs = None
self.samplesAug = self.RHSs[self.HFEngine.spacedim() :, : -1]
super().popSample()
@property
def HFEngine(self):
"""Value of HFEngine. Its assignment resets history."""
return self._HFEngine
@HFEngine.setter
def HFEngine(self, HFEngine):
self._HFEngine = HFEngine
self.resetHistory()
self.PODEngine = PODEngine(self._HFEngine)
def preprocesssamples(self):
ns = self.nsamples
if ns <= 0: return
return self.samplesAug[:, ns - 1].reshape((-1,
self.HFEngine.spacedim())).T
def preprocessb(self, mu:complex, overwrite : bool = False,
homogeneized : bool = False):
ns = self.nsamples
r = super().preprocessb(mu, overwrite, homogeneized)
if ns == 0:
return r
elif ns == 1:
r = r / self.RArnoldi[0, 0]
else:
r = ((r - self.RHSs[:, :ns-1].dot(self.RArnoldi[:ns-1, ns-1]))
/ self.RArnoldi[ns-1, ns-1])
if overwrite:
self.RHSs[:, ns - 1] = r
else:
if ns == 1:
self.RHSs = r.reshape((- 1, 1))
else:
self.RHSs = np.hstack((self.RHSs, r[:, None]))
return r
def postprocessu(self, u:Np1D, overwrite : bool = False):
if self.verbosity >= 10:
verbosityDepth("INIT", "Starting orthogonalization.",
timestamp = self.timestamp)
ns = self.nsamples
nsAug = (ns + 1) * self.HFEngine.spacedim()
if ns == 0:
u, h, _ = self.PODEngine.GS(u, np.empty((0, 0)))
r = h[0]
uAug = copy(u)
else:
uAug = np.concatenate((self.samplesAug[self.HFEngine.spacedim()
- nsAug :, ns - 1],
u), axis = None)
u, h, uAug = self.PODEngine.GS(u, self.samples[:, : ns], ns, uAug,
self.samplesAug[- nsAug :, : ns])
if overwrite:
self.HArnoldi[: ns + 1, ns] = h
if ns > 0:
r = self.HArnoldi[: ns + 1, 1 : ns + 1].dot(
self.RArnoldi[: ns, ns - 1])
self.RArnoldi[: ns + 1, ns] = r
self.samplesAug[- nsAug :, ns] = uAug
else:
if ns == 0:
self.HArnoldi = h.reshape((1, 1))
self.RArnoldi = r.reshape((1, 1))
self.samplesAug = uAug.reshape((-1, 1))
else:
self.HArnoldi=np.block([[ self.HArnoldi, h[:-1, None]],
[np.zeros((1, ns)), h[-1]]])
if ns > 0:
r = self.HArnoldi[: ns + 1, 1 : ns + 1].dot(
self.RArnoldi[: ns, ns - 1])
self.RArnoldi=np.block([[ self.RArnoldi, r[:-1, None]],
[np.zeros((1, ns)), r[-1]]])
self.samplesAug=np.vstack((np.zeros((self.HFEngine.spacedim(),
ns)),
self.samplesAug))
self.samplesAug = np.hstack((self.samplesAug, uAug[:, None]))
if self.verbosity >= 10:
verbosityDepth("DEL", "Done orthogonalizing.",
timestamp = self.timestamp)
return u
def preallocateSamples(self, u:Np1D, mu:np.complex, n:int):
super().preallocateSamples(u, mu, n)
h = self.HArnoldi
r = self.RArnoldi
saug = self.samplesAug
self.HArnoldi = np.zeros((n, n), dtype = u.dtype)
self.HArnoldi[0, 0] = h[0, 0]
self.RArnoldi = np.zeros((n, n), dtype = u.dtype)
self.RArnoldi[0, 0] = r[0, 0]
self.RHSs = np.empty((u.size, n - 1), dtype = u.dtype)
self.samplesAug = np.zeros((self.HFEngine.spacedim() * (n + 1), n),
dtype = u.dtype)
self.samplesAug[- self.HFEngine.spacedim() :, 0] = saug[:, 0]
diff --git a/rrompy/utilities/base/__init__.py b/rrompy/utilities/base/__init__.py
index 016678d..c236f15 100644
--- a/rrompy/utilities/base/__init__.py
+++ b/rrompy/utilities/base/__init__.py
@@ -1,43 +1,46 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
from .find_dict_str_key import findDictStrKey
from .get_new_filename import getNewFilename
+from .pickle_utilities import pickleDump, pickleLoad
from .purge_dict import purgeDict
from .purge_list import purgeList
from .number_theory import (squareResonances, primeFactorize,
getLowestPrimeFactor)
from .sobol import sobolGenerate
from .low_discrepancy import vanderCorput, lowDiscrepancy
from . import types as Types
from .verbosity_depth import verbosityDepth
__all__ = [
'findDictStrKey',
'getNewFilename',
+ 'pickleDump',
+ 'pickleLoad',
'purgeDict',
'purgeList',
'squareResonances',
'primeFactorize',
'getLowestPrimeFactor',
'sobolGenerate',
'Types',
'verbosityDepth'
]
diff --git a/rrompy/reduction_methods/trained_model/trained_model_data.py b/rrompy/utilities/base/pickle_utilities.py
similarity index 64%
copy from rrompy/reduction_methods/trained_model/trained_model_data.py
copy to rrompy/utilities/base/pickle_utilities.py
index 82cd8fd..d697f83 100644
--- a/rrompy/reduction_methods/trained_model/trained_model_data.py
+++ b/rrompy/utilities/base/pickle_utilities.py
@@ -1,31 +1,28 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
-from rrompy.utilities.base.types import Np2D
+import pickle
-__all__ = ['TrainedModelData']
+def pickleDump(what, filename, byte : bool = True):
+ with open(filename, "w" + "b" * byte) as fileOut:
+ pickle.dump(what, fileOut)
-class TrainedModelData:
- """ROM approximant evaluation data (must be pickle-able)."""
- def __init__(self, name:str, mu0:complex, projMat:Np2D,
- rescalingExp : float = 1.):
- self.name = name
- self.mu0 = mu0
- self.projMat = projMat
- self.rescalingExp = rescalingExp
+def pickleLoad(filename, byte : bool = True):
+ with open(filename, "r" + "b" * byte) as fileIn:
+ return pickle.load(fileIn)
diff --git a/rrompy/utilities/poly_fitting/custom_fit.py b/rrompy/utilities/poly_fitting/custom_fit.py
index 622702b..f4164b7 100644
--- a/rrompy/utilities/poly_fitting/custom_fit.py
+++ b/rrompy/utilities/poly_fitting/custom_fit.py
@@ -1,146 +1,135 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
import numpy as np
import numpy.linalg as la
from rrompy.utilities.exception_manager import RROMPyException, RROMPyWarning
__all__ = ["customFit"]
def customFit(van, y, rcond=None, full=False, w=None):
"""
Least-squares fit of a polynomial to data. Copied from
numpy.polynomial.polynomial.
Parameters
----------
va : array_like, shape (`M`,`deg` + 1)
Vandermonde-like matrix.
y : array_like, shape (`M`,) or (`M`, `K`)
y-coordinates of the sample points. Several sets of sample points
sharing the same x-coordinates can be (independently) fit with one
call to `polyfit` by passing in for `y` a 2-D array that contains
one data set per column.
rcond : float, optional
Relative condition number of the fit. Singular values smaller
than `rcond`, relative to the largest singular value, will be
ignored. The default value is ``len(van)*eps``, where `eps` is the
relative precision of the platform's float type, about 2e-16 in
most cases.
full : bool, optional
Switch determining the nature of the return value. When ``False``
(the default) just the coefficients are returned; when ``True``,
diagnostic information from the singular value decomposition (used
to solve the fit's matrix equation) is also returned.
w : array_like, shape (`M`,), optional
Weights. If not None, the contribution of each point
``(x[i],y[i])`` to the fit is weighted by `w[i]`. Ideally the
weights are chosen so that the errors of the products ``w[i]*y[i]``
all have the same variance. The default value is None.
Returns
-------
coef : ndarray, shape (`deg` + 1,) or (`deg` + 1, `K`)
Polynomial coefficients ordered from low to high. If `y` was 2-D,
the coefficients in column `k` of `coef` represent the polynomial
fit to the data in `y`'s `k`-th column.
[residuals, rank, singular_values, rcond] : list
These values are only returned if `full` = True
resid -- sum of squared residuals of the least squares fit
rank -- the numerical rank of the scaled Vandermonde matrix
sv -- singular values of the scaled Vandermonde matrix
rcond -- value of `rcond`.
For more details, see `linalg.lstsq`.
-
- Raises
- ------
- RankWarning
- Raised if the matrix in the least-squares fit is rank deficient.
- The warning is only raised if `full` == False. The warnings can
- be turned off by:
-
- >>> import warnings
- >>> warnings.simplefilter('ignore', RankWarning)
"""
van = np.asarray(van) + 0.0
y = np.asarray(y) + 0.0
# check arguments.
if van.ndim != 2:
raise RROMPyException("expected 2D vector for van")
if van.size == 0:
raise RROMPyException("expected non-empty vector for van")
if y.ndim < 1 or y.ndim > 2:
raise RROMPyException("expected 1D or 2D array for y")
if len(van) != len(y):
raise RROMPyException("expected van and y to have same length")
order = van.shape[1]
# set up the least squares matrices in transposed form
lhs = van.T
rhs = y.T
if isinstance(w, (str, )) and w.upper() == "AUTO":
# Determine the norms of the design matrix rows.
if issubclass(van.dtype.type, np.complexfloating):
w = np.sqrt((np.square(van.real) + np.square(van.imag)).sum(1))
else:
w = np.sqrt(np.square(van).sum(1))
w[w == 0] = 1
w = np.power(w, -1.)
if w is not None:
w = np.asarray(w) + 0.0
if w.ndim != 1:
raise RROMPyException("expected 1D vector for w")
if len(van) != len(w):
raise RROMPyException("expected van and w to have same length")
# apply weights. Don't use inplace operations as they
# can cause problems with NA.
lhs = lhs * w
rhs = rhs * w
# set rcond
if rcond is None:
rcond = len(van)*np.finfo(van.dtype).eps
# Determine the norms of the design matrix columns.
if issubclass(lhs.dtype.type, np.complexfloating):
scl = np.sqrt((np.square(lhs.real) + np.square(lhs.imag)).sum(1))
else:
scl = np.sqrt(np.square(lhs).sum(1))
scl[scl == 0] = 1
# Solve the least squares problem.
c, resids, rank, s = la.lstsq(lhs.T/scl, rhs.T, rcond)
c = (c.T/scl).T
# warn on rank reduction
if rank != order and not full:
- msg = "The fit may be poorly conditioned"
- RROMPyWarning(msg, np.polynomial.polyutils.RankWarning, stacklevel = 2)
+ RROMPyWarning("The fit may be poorly conditioned", stacklevel = 2)
if full:
return c, [resids, rank, s, rcond]
else:
return c