diff --git a/rrompy/hfengines/base/fenics_engine_base.py b/rrompy/hfengines/base/fenics_engine_base.py
index 0802115..9ab203f 100644
--- a/rrompy/hfengines/base/fenics_engine_base.py
+++ b/rrompy/hfengines/base/fenics_engine_base.py
@@ -1,408 +1,408 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
from os import path, mkdir
import fenics as fen
import numpy as np
from matplotlib import pyplot as plt
from .numpy_engine_base import NumpyEngineBase
from rrompy.utilities.base.types import Np1D, strLst, FenFunc, Tuple, List
from rrompy.utilities.base import (purgeList, getNewFilename,
verbosityManager as vbMng)
from rrompy.solver.fenics import L2NormMatrix, fenplot, interp_project
from .boundary_conditions import BoundaryConditions
from rrompy.utilities.exception_manager import RROMPyException
__all__ = ['FenicsEngineBase']
class FenicsEngineBase(NumpyEngineBase):
"""Generic solver for parametric fenics problems."""
def __init__(self, degree_threshold : int = np.inf,
verbosity : int = 10, timestamp : bool = True):
super().__init__(verbosity = verbosity, timestamp = timestamp)
self.BCManager = BoundaryConditions("Dirichlet")
self.V = fen.FunctionSpace(fen.UnitSquareMesh(10, 10), "P", 1)
self.degree_threshold = degree_threshold
@property
def V(self):
"""Value of V."""
return self._V
@V.setter
def V(self, V):
if not type(V).__name__ == 'FunctionSpace':
raise RROMPyException("V type not recognized.")
self.dsToBeSet = True
self._V = V
self.u = fen.TrialFunction(V)
self.v = fen.TestFunction(V)
@property
def spacedim(self):
if hasattr(self, "_V"): return self.V.dim()
return super().spacedim
def autoSetDS(self):
"""Set FEniCS boundary measure based on boundary function handles."""
if self.dsToBeSet:
vbMng(self, "INIT", "Initializing boundary measures.", 20)
mesh = self.V.mesh()
NB = self.NeumannBoundary
RB = self.RobinBoundary
boundary_markers = fen.MeshFunction("size_t", mesh,
mesh.topology().dim() - 1)
NB.mark(boundary_markers, 0)
RB.mark(boundary_markers, 1)
self.ds = fen.Measure("ds", domain = mesh,
subdomain_data = boundary_markers)
self.dsToBeSet = False
vbMng(self, "DEL", "Done assembling boundary measures.", 20)
def buildEnergyNormForm(self):
"""
Build sparse matrix (in CSR format) representative of scalar product.
"""
vbMng(self, "INIT", "Assembling energy matrix.", 20)
self.energyNormMatrix = L2NormMatrix(self.V)
vbMng(self, "DEL", "Done assembling energy matrix.", 20)
def buildEnergyNormDualForm(self):
"""
Build sparse matrix (in CSR format) representative of dual scalar
product.
"""
if not hasattr(self, "energyNormMatrix"):
self.buildEnergyNormForm()
self.energyNormDualMatrix = self.energyNormMatrix
def buildEnergyNormPartialDualForm(self):
"""
Build sparse matrix (in CSR format) representative of dual scalar
product without duality.
"""
if not hasattr(self, "energyNormMatrix"):
self.buildEnergyNormForm()
self.energyNormPartialDualMatrix = self.energyNormMatrix
def liftDirichletData(self) -> Np1D:
"""Lift Dirichlet datum."""
if not hasattr(self, "_liftedDirichletDatum"):
liftRe = interp_project(self.DirichletDatum[0], self.V)
liftIm = interp_project(self.DirichletDatum[1], self.V)
self._liftedDirichletDatum = (np.array(liftRe.vector())
+ 1.j * np.array(liftIm.vector()))
return self._liftedDirichletDatum
def reduceQuadratureDegree(self, fun:FenFunc, name:str):
"""Check whether to reduce compiler parameters to degree threshold."""
if not np.isinf(self.degree_threshold):
from ufl.algorithms.estimate_degrees import (
estimate_total_polynomial_degree as ETPD)
try:
deg = ETPD(fun)
except:
return False
if deg > self.degree_threshold:
vbMng(self, "MAIN",
("Reducing quadrature degree from {} to {} for "
"{}.").format(deg, self.degree_threshold, name), 15)
return True
return False
def iterReduceQuadratureDegree(self, funsNames:List[Tuple[FenFunc, str]]):
"""
Iterate reduceQuadratureDegree over list and define reduce compiler
parameters.
"""
if funsNames is not None:
for fun, name in funsNames:
if self.reduceQuadratureDegree(fun, name):
return {"quadrature_degree" : self.degree_threshold}
return {}
def plot(self, u:Np1D, warping : List[callable] = None,
is_state : bool = False, name : str = "u", save : str = None,
what : strLst = 'all', forceNewFile : bool = True,
saveFormat : str = "eps", saveDPI : int = 100, show : bool = True,
fenplotArgs : dict = {}, **figspecs) -> str:
"""
Do some nice plots of the complex-valued function with given dofs.
Args:
u: numpy complex array with function dofs.
warping(optional): Domain warping functions.
is_state(optional): whether given u is value before multiplication
by c. Defaults to False.
name(optional): Name to be shown as title of the plots. Defaults to
'u'.
save(optional): Where to save plot(s). Defaults to None, i.e. no
saving.
what(optional): Which plots to do. If list, can contain 'ABS',
'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard 'ALL'.
Defaults to 'ALL'.
forceNewFile(optional): Whether to create new output file.
saveFormat(optional): Format for saved plot(s). Defaults to "eps".
saveDPI(optional): DPI for saved plot(s). Defaults to 100.
show(optional): Whether to show figure. Defaults to True.
fenplotArgs(optional): Optional arguments for fenplot.
figspecs(optional key args): Optional arguments for matplotlib
figure creation.
Returns:
Output filename.
"""
if not is_state and not self.isCEye:
return super().plot(u, warping, False, name, save, what,
- saveFormat, saveDPI, show, fenplotArgs,
- **figspecs)
+ forceNewFile, saveFormat, saveDPI, show,
+ fenplotArgs, **figspecs)
if isinstance(what, (str,)):
if what.upper() == 'ALL':
what = ['ABS', 'PHASE', 'REAL', 'IMAG']
else:
what = [what]
what = purgeList(what, ['ABS', 'PHASE', 'REAL', 'IMAG'],
listname = self.name() + ".what", baselevel = 1)
if len(what) == 0: return
if 'figsize' not in figspecs.keys():
figspecs['figsize'] = (13. * len(what) / 4, 3)
subplotcode = 100 + len(what) * 10
plt.figure(**figspecs)
plt.jet()
if 'ABS' in what:
uAb = fen.Function(self.V)
uAb.vector().set_local(np.abs(u))
subplotcode = subplotcode + 1
plt.subplot(subplotcode)
p = fenplot(uAb, warping = warping, title = "|{0}|".format(name),
**fenplotArgs)
if self.V.mesh().geometric_dimension() > 1:
plt.colorbar(p)
if 'PHASE' in what:
uPh = fen.Function(self.V)
uPh.vector().set_local(np.angle(u))
subplotcode = subplotcode + 1
plt.subplot(subplotcode)
p = fenplot(uPh, warping = warping,
title = "phase({0})".format(name), **fenplotArgs)
if self.V.mesh().geometric_dimension() > 1:
plt.colorbar(p)
if 'REAL' in what:
uRe = fen.Function(self.V)
uRe.vector().set_local(np.real(u))
subplotcode = subplotcode + 1
plt.subplot(subplotcode)
p = fenplot(uRe, warping = warping, title = "Re({0})".format(name),
**fenplotArgs)
if self.V.mesh().geometric_dimension() > 1:
plt.colorbar(p)
if 'IMAG' in what:
uIm = fen.Function(self.V)
uIm.vector().set_local(np.imag(u))
subplotcode = subplotcode + 1
plt.subplot(subplotcode)
p = fenplot(uIm, warping = warping, title = "Im({0})".format(name),
**fenplotArgs)
if self.V.mesh().geometric_dimension() > 1:
plt.colorbar(p)
if save is not None:
save = save.strip()
if forceNewFile:
fileOut = getNewFilename("{}_fig_".format(save), saveFormat)
else:
fileOut = "{}_fig.{}".format(save, saveFormat)
plt.savefig(fileOut, format = saveFormat, dpi = saveDPI)
else: fileOut = None
if show:
plt.show()
plt.close()
return fileOut
def plotmesh(self, warping : List[callable] = None, name : str = "Mesh",
save : str = None, forceNewFile : bool = True,
saveFormat : str = "eps", saveDPI : int = 100,
show : bool = True, fenplotArgs : dict = {},
**figspecs) -> str:
"""
Do a nice plot of the mesh.
Args:
u: numpy complex array with function dofs.
warping(optional): Domain warping functions.
name(optional): Name to be shown as title of the plots. Defaults to
'u'.
save(optional): Where to save plot(s). Defaults to None, i.e. no
saving.
forceNewFile(optional): Whether to create new output file.
saveFormat(optional): Format for saved plot(s). Defaults to "eps".
saveDPI(optional): DPI for saved plot(s). Defaults to 100.
show(optional): Whether to show figure. Defaults to True.
fenplotArgs(optional): Optional arguments for fenplot.
figspecs(optional key args): Optional arguments for matplotlib
figure creation.
Returns:
Output filename.
"""
plt.figure(**figspecs)
fenplot(self.V.mesh(), warping = warping, **fenplotArgs)
if save is not None:
save = save.strip()
if forceNewFile:
fileOut = getNewFilename("{}_msh_".format(save), saveFormat)
else:
fileOut = "{}_msh.{}".format(save, saveFormat)
plt.savefig(fileOut, format = saveFormat, dpi = saveDPI)
else: fileOut = None
if show:
plt.show()
plt.close()
return fileOut
def outParaview(self, u:Np1D, warping : List[callable] = None,
is_state : bool = False, name : str = "u",
filename : str = "out", time : float = 0.,
what : strLst = 'all', forceNewFile : bool = True,
folder : bool = False, filePW = None) -> str:
"""
Output complex-valued function with given dofs to ParaView file.
Args:
u: numpy complex array with function dofs.
warping(optional): Domain warping functions.
is_state(optional): whether given u is value before multiplication
by c. Defaults to False.
name(optional): Base name to be used for data output.
filename(optional): Name of output file.
time(optional): Timestamp.
what(optional): Which plots to do. If list, can contain 'MESH',
'ABS', 'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard
'ALL'. Defaults to 'ALL'.
forceNewFile(optional): Whether to create new output file.
folder(optional): Whether to create an additional folder layer.
filePW(optional): Fenics File entity (for time series).
Returns:
Output filename.
"""
if not is_state and not self.isCEye:
raise RROMPyException(("Cannot output to Paraview non-state "
"object."))
if isinstance(what, (str,)):
if what.upper() == 'ALL':
what = ['MESH', 'ABS', 'PHASE', 'REAL', 'IMAG']
else:
what = [what]
what = purgeList(what, ['MESH', 'ABS', 'PHASE', 'REAL', 'IMAG'],
listname = self.name() + ".what", baselevel = 1)
if len(what) == 0: return
if filePW is None:
if folder:
if not path.exists(filename + "/"):
mkdir(filename)
idxpath = filename.rfind("/")
filename += "/" + filename[idxpath + 1 :]
if forceNewFile:
filePW = fen.File(getNewFilename(filename, "pvd"))
else:
filePW = fen.File("{}.pvd".format(filename))
if warping is not None:
fen.ALE.move(self.V.mesh(),
interp_project(warping[0], self.V.mesh()))
if what == ['MESH']:
filePW << (self.V.mesh(), time)
if 'ABS' in what:
uAb = fen.Function(self.V, name = "{}_ABS".format(name))
uAb.vector().set_local(np.abs(u))
filePW << (uAb, time)
if 'PHASE' in what:
uPh = fen.Function(self.V, name = "{}_PHASE".format(name))
uPh.vector().set_local(np.angle(u))
filePW << (uPh, time)
if 'REAL' in what:
uRe = fen.Function(self.V, name = "{}_REAL".format(name))
uRe.vector().set_local(np.real(u))
filePW << (uRe, time)
if 'IMAG' in what:
uIm = fen.Function(self.V, name = "{}_IMAG".format(name))
uIm.vector().set_local(np.imag(u))
filePW << (uIm, time)
if warping is not None:
fen.ALE.move(self.V.mesh(),
interp_project(warping[1], self.V.mesh()))
return filePW
def outParaviewTimeDomain(self, u:Np1D, omega:float,
warping : List[callable] = None,
is_state : bool = False,
timeFinal : float = None,
periodResolution : int = 20, name : str = "u",
filename : str = "out",
forceNewFile : bool = True,
folder : bool = False) -> str:
"""
Output complex-valued function with given dofs to ParaView file,
converted to time domain.
Args:
u: numpy complex array with function dofs.
omega: frequency.
warping(optional): Domain warping functions.
is_state(optional): whether given u is value before multiplication
by c. Defaults to False.
timeFinal(optional): final time of simulation.
periodResolution(optional): number of time steps per period.
name(optional): Base name to be used for data output.
filename(optional): Name of output file.
forceNewFile(optional): Whether to create new output file.
folder(optional): Whether to create an additional folder layer.
Returns:
Output filename.
"""
if not is_state and not self.isCEye:
raise RROMPyException(("Cannot output to Paraview non-state "
"object."))
if folder:
if not path.exists(filename + "/"):
mkdir(filename)
idxpath = filename.rfind("/")
filename += "/" + filename[idxpath + 1 :]
if forceNewFile:
filePW = fen.File(getNewFilename(filename, "pvd"))
else:
filePW = fen.File("{}.pvd".format(filename))
omega = np.abs(omega)
t = 0.
dt = 2. * np.pi / omega / periodResolution
if timeFinal is None: timeFinal = 2. * np.pi / omega - dt
if warping is not None:
fen.ALE.move(self.V.mesh(),
interp_project(warping[0], self.V.mesh()))
for j in range(int(np.ceil(timeFinal / dt)) + 1):
ut = fen.Function(self.V, name = name)
ut.vector().set_local(np.real(u) * np.cos(omega * t)
+ np.imag(u) * np.sin(omega * t))
filePW << (ut, t)
t += dt
if warping is not None:
fen.ALE.move(self.V.mesh(),
interp_project(warping[1], self.V.mesh()))
return filePW
diff --git a/rrompy/hfengines/base/vector_fenics_engine_base.py b/rrompy/hfengines/base/vector_fenics_engine_base.py
index 4c45f1b..87d82ab 100644
--- a/rrompy/hfengines/base/vector_fenics_engine_base.py
+++ b/rrompy/hfengines/base/vector_fenics_engine_base.py
@@ -1,193 +1,193 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
import fenics as fen
import numpy as np
from matplotlib import pyplot as plt
from .fenics_engine_base import FenicsEngineBase
from rrompy.utilities.base.types import Np1D, List, strLst
from rrompy.utilities.base import purgeList, getNewFilename
from rrompy.solver.fenics import fenplot
__all__ = ['VectorFenicsEngineBase']
class VectorFenicsEngineBase(FenicsEngineBase):
"""Generic solver for parametric vector fenics problems."""
def __init__(self, degree_threshold : int = np.inf,
verbosity : int = 10, timestamp : bool = True):
super().__init__(degree_threshold = degree_threshold,
verbosity = verbosity, timestamp = timestamp)
self.V = fen.VectorFunctionSpace(fen.UnitSquareMesh(10, 10), "P", 1)
def plot(self, u:Np1D, warping : List[callable] = None,
is_state : bool = False, name : str = "u", save : str = None,
what : strLst = 'all', forceNewFile : bool = True,
saveFormat : str = "eps", saveDPI : int = 100, show : bool = True,
fenplotArgs : dict = {}, **figspecs) -> str:
"""
Do some nice plots of the complex-valued function with given dofs.
Args:
u: numpy complex array with function dofs.
warping(optional): Domain warping functions.
is_state(optional): whether given u is value before multiplication
by c. Defaults to False.
name(optional): Name to be shown as title of the plots. Defaults to
'u'.
save(optional): Where to save plot(s). Defaults to None, i.e. no
saving.
what(optional): Which plots to do. If list, can contain 'ABS',
'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard 'ALL'.
Defaults to 'ALL'.
forceNewFile(optional): Whether to create new output file.
saveFormat(optional): Format for saved plot(s). Defaults to "eps".
saveDPI(optional): DPI for saved plot(s). Defaults to.
show(optional): Whether to show figure. Defaults to True.
fenplotArgs(optional): Optional arguments for fenplot.
figspecs(optional key args): Optional arguments for matplotlib
figure creation.
Returns:
Output filename.
"""
if not is_state and not self.isCEye:
return super().plot(u, warping, False, name, save, what,
- saveFormat, saveDPI, show, fenplotArgs,
- **figspecs)
+ forceNewFile, saveFormat, saveDPI, show,
+ fenplotArgs, **figspecs)
if isinstance(what, (str,)):
if what.upper() == 'ALL':
what = ['ABS', 'PHASE', 'REAL', 'IMAG']
else:
what = [what]
what = purgeList(what, ['ABS', 'PHASE', 'REAL', 'IMAG'],
listname = self.name() + ".what", baselevel = 1)
if 'figsize' not in figspecs.keys():
figspecs['figsize'] = (13. * max(len(what), 1) / 4, 3)
if len(what) > 0:
for j in range(self.V.num_sub_spaces()):
subplotcode = 100 + len(what) * 10
II = self.V.sub(j).dofmap().dofs()
Vj = self.V.sub(j).collapse()
plt.figure(**figspecs)
plt.jet()
if 'ABS' in what:
uAb = fen.Function(Vj)
uAb.vector().set_local(np.abs(u[II]))
subplotcode = subplotcode + 1
plt.subplot(subplotcode)
p = fenplot(uAb, warping = warping,
title = "|{}_comp{}|".format(name, j,
**fenplotArgs))
if self.V.mesh().geometric_dimension() > 1:
plt.colorbar(p)
if 'PHASE' in what:
uPh = fen.Function(Vj)
uPh.vector().set_local(np.angle(u[II]))
subplotcode = subplotcode + 1
plt.subplot(subplotcode)
p = fenplot(uPh, warping = warping,
title = "phase({}_comp{})".format(name, j),
**fenplotArgs)
if self.V.mesh().geometric_dimension() > 1:
plt.colorbar(p)
if 'REAL' in what:
uRe = fen.Function(Vj)
uRe.vector().set_local(np.real(u[II]))
subplotcode = subplotcode + 1
plt.subplot(subplotcode)
p = fenplot(uRe, warping = warping,
title = "Re({}_comp{})".format(name, j),
**fenplotArgs)
if self.V.mesh().geometric_dimension() > 1:
plt.colorbar(p)
if 'IMAG' in what:
uIm = fen.Function(Vj)
uIm.vector().set_local(np.imag(u[II]))
subplotcode = subplotcode + 1
plt.subplot(subplotcode)
p = fenplot(uIm, warping = warping,
title = "Im({}_comp{})".format(name, j),
**fenplotArgs)
if self.V.mesh().geometric_dimension() > 1:
plt.colorbar(p)
if save is not None:
save = save.strip()
if forceNewFile:
fileOut = getNewFilename(
"{}_comp{}_fig_".format(save, j),
saveFormat)
else:
fileOut = "{}_comp{}_fig.{}".format(save,j,saveFormat)
plt.savefig(fileOut, format = saveFormat, dpi = saveDPI)
if show:
plt.show()
plt.close()
try:
if len(what) > 1:
figspecs['figsize'] = (2. / len(what) * figspecs['figsize'][0],
figspecs['figsize'][1])
elif len(what) == 0:
figspecs['figsize'] = (2. * figspecs['figsize'][0],
figspecs['figsize'][1])
if len(what) == 0 or 'ABS' in what or 'REAL' in what:
uVRe = fen.Function(self.V)
uVRe.vector().set_local(np.real(u))
plt.figure(**figspecs)
plt.jet()
p = fenplot(uVRe, warping = warping,
title = "{}_Re".format(name),
mode = "displacement", **fenplotArgs)
if self.V.mesh().geometric_dimension() > 1:
plt.colorbar(p)
if save is not None:
save = save.strip()
if forceNewFile:
fileOut = getNewFilename(
"{}_disp_Re_fig_".format(save),
saveFormat)
else:
fileOut = "{}_disp_Re_fig.{}".format(save, saveFormat)
plt.savefig(fileOut, format = saveFormat, dpi = saveDPI)
plt.show()
plt.close()
if 'ABS' in what or 'IMAG' in what:
uVIm = fen.Function(self.V)
uVIm.vector().set_local(np.imag(u))
plt.figure(**figspecs)
plt.jet()
p = fenplot(uVIm, warping = warping,
title = "{}_Im".format(name),
mode = "displacement", **fenplotArgs)
if self.V.mesh().geometric_dimension() > 1:
plt.colorbar(p)
if save is not None:
save = save.strip()
if forceNewFile:
fileOut = getNewFilename(
"{}_disp_Im_fig_".format(save),
saveFormat)
else:
fileOut = "{}_disp_Im_fig.{}".format(save, saveFormat)
plt.savefig(fileOut, format = saveFormat, dpi = saveDPI)
if show:
plt.show()
plt.close()
except:
pass
diff --git a/rrompy/reduction_methods/base/generic_approximant.py b/rrompy/reduction_methods/base/generic_approximant.py
index b52ab89..3ad8127 100644
--- a/rrompy/reduction_methods/base/generic_approximant.py
+++ b/rrompy/reduction_methods/base/generic_approximant.py
@@ -1,908 +1,852 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
from abc import abstractmethod
import numpy as np
from itertools import product as iterprod
from copy import deepcopy as copy
from os import remove as osrm
from rrompy.sampling.standard import (SamplingEngineStandard,
SamplingEngineStandardPOD)
from rrompy.utilities.base.types import (Np1D, DictAny, HFEng, List, Tuple,
ListAny, strLst, paramVal, paramList,
sampList)
from rrompy.utilities.base import (purgeDict, verbosityManager as vbMng,
getNewFilename)
from rrompy.utilities.exception_manager import (RROMPyException, RROMPyAssert,
RROMPy_READY, RROMPy_FRAGILE)
from rrompy.utilities.base import pickleDump, pickleLoad
from rrompy.parameter import (emptyParameterList, checkParameter,
checkParameterList)
from rrompy.sampling import sampleList, emptySampleList
__all__ = ['GenericApproximant']
def addNormFieldToClass(self, fieldName):
def objFunc(self, mu:paramList, *args, **kwargs) -> Np1D:
uV = getattr(self.__class__, "get" + fieldName)(self, mu)
kwargs["is_state"] = False
val = self.HFEngine.norm(uV, *args, **kwargs)
return val
setattr(self.__class__, "norm" + fieldName, objFunc)
def addNormDualFieldToClass(self, fieldName):
def objFunc(self, mu:paramList, *args, **kwargs) -> Np1D:
uV = getattr(self.__class__, "get" + fieldName)(self, mu)
kwargs["is_state"] = True
if "dual" not in kwargs.keys(): kwargs["dual"] = True
val = self.HFEngine.norm(uV, *args, **kwargs)
return val
setattr(self.__class__, "norm" + fieldName, objFunc)
def addPlotFieldToClass(self, fieldName):
def objFunc(self, mu:paramList, *args, **kwargs):
uV = getattr(self.__class__, "get" + fieldName)(self, mu)
kwargsCopy = copy(kwargs)
filesOut = []
for j, u in enumerate(uV):
if "name" in kwargs.keys():
kwargsCopy["name"] = kwargs["name"] + str(j)
filesOut += [self.HFEngine.plot(u, *args, **kwargsCopy)]
if filesOut[0] is None: return None
return filesOut
setattr(self.__class__, "plot" + fieldName, objFunc)
def addPlotDualFieldToClass(self, fieldName):
def objFunc(self, mu:paramList, *args, **kwargs):
uV = getattr(self.__class__, "get" + fieldName)(self, mu)
kwargsCopy = copy(kwargs)
filesOut = []
for j, u in enumerate(uV):
if "name" in kwargs.keys():
kwargsCopy["name"] = kwargs["name"] + str(j)
filesOut += [self.HFEngine.plot(u, *args, **kwargsCopy)]
if filesOut[0] is None: return None
return filesOut
setattr(self.__class__, "plot" + fieldName, objFunc)
def addOutParaviewFieldToClass(self, fieldName):
def objFunc(self, mu:paramVal, *args, **kwargs):
if not hasattr(self.HFEngine, "outParaview"):
raise RROMPyException(("High fidelity engine cannot output to "
"Paraview."))
uV = getattr(self.__class__, "get" + fieldName)(self, mu)
kwargsCopy = copy(kwargs)
filesOut = []
for j, u in enumerate(uV):
if "name" in kwargs.keys():
kwargsCopy["name"] = kwargs["name"] + str(j)
filesOut += [self.HFEngine.outParaview(u, *args, **kwargsCopy)]
if filesOut[0] is None: return None
return filesOut
setattr(self.__class__, "outParaview" + fieldName, objFunc)
def addOutParaviewTimeDomainFieldToClass(self, fieldName):
def objFunc(self, mu:paramVal, *args, **kwargs):
if not hasattr(self.HFEngine, "outParaviewTimeDomain"):
raise RROMPyException(("High fidelity engine cannot output to "
"Paraview."))
uV = getattr(self.__class__, "get" + fieldName)(self, mu)
omega = args.pop(0) if len(args) > 0 else np.real(mu)
kwargsCopy = copy(kwargs)
filesOut = []
for j, u in enumerate(uV):
if "name" in kwargs.keys():
kwargsCopy["name"] = kwargs["name"] + str(j)
filesOut += [self.HFEngine.outParaviewTimeDomain(u, omega, *args,
**kwargsCopy)]
if filesOut[0] is None: return None
return filesOut
setattr(self.__class__, "outParaviewTimeDomain" + fieldName, objFunc)
def getTrainedModelClass(name):
from importlib import import_module as im
try:
return getattr(im("rrompy.reduction_methods.trained_model"), name)
except:
raise RROMPyException("Trained model name not recognized.")
class GenericApproximant:
"""
ABSTRACT
ROM approximant computation for parametric problems.
Args:
HFEngine: HF problem solver.
mu0(optional): Default parameter. Defaults to 0.
approxParameters(optional): Dictionary containing values for main
parameters of approximant. Recognized keys are:
- 'POD': whether to compute POD of snapshots; defaults to True;
- 'S': total number of samples current approximant relies upon.
Defaults to empty dict.
approx_state(optional): Whether to approximate state. Defaults to
False.
verbosity(optional): Verbosity level. Defaults to 10.
Attributes:
HFEngine: HF problem solver.
trainedModel: Trained model evaluator.
mu0: Default parameter.
approxParameters: Dictionary containing values for main parameters of
approximant. Recognized keys are in parameterList{Soft,Critical}.
parameterListSoft: Recognized keys of soft approximant parameters:
- 'POD': whether to compute POD of snapshots.
parameterListCritical: Recognized keys of critical approximant
parameters:
- 'S': total number of samples current approximant relies upon.
approx_state: Whether to approximate state.
verbosity: Verbosity level.
POD: Whether to compute POD of snapshots.
S: Number of solution snapshots over which current approximant is
based upon.
samplingEngine: Sampling engine.
uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as
sampleList.
lastSolvedHF: Parameter(s) corresponding to last computed high fidelity
solution(s) as parameterList.
uApproxReduced: Reduced approximate solution(s) with parameter(s)
lastSolvedApprox as sampleList.
lastSolvedApproxReduced: Parameter(s) corresponding to last computed
reduced approximate solution(s) as parameterList.
uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as
sampleList.
lastSolvedApprox: Parameter(s) corresponding to last computed
approximate solution(s) as parameterList.
"""
__all__ += [ftype + dtype for ftype, dtype in iterprod(
["norm", "plot", "outParaview", "outParaviewTimeDomain"],
["HF", "RHS", "Approx", "Res", "Err"])]
def __init__(self, HFEngine:HFEng, mu0 : paramVal = None,
approxParameters : DictAny = {}, approx_state : bool = False,
verbosity : int = 10, timestamp : bool = True):
self._preInit()
self._mode = RROMPy_READY
self.approx_state = approx_state
self.verbosity = verbosity
self.timestamp = timestamp
vbMng(self, "INIT",
"Initializing engine of type {}.".format(self.name()), 10)
self._HFEngine = HFEngine
self.trainedModel = None
self.lastSolvedHF = emptyParameterList()
self.uHF = emptySampleList()
self._addParametersToList(["POD"], [True], ["S"], [1])
if mu0 is None:
if hasattr(self.HFEngine, "mu0"):
self.mu0 = checkParameter(self.HFEngine.mu0)
else:
raise RROMPyException(("Center of approximation cannot be "
"inferred from HF engine. Parameter "
"required"))
else:
self.mu0 = checkParameter(mu0, self.HFEngine.npar)
self.resetSamples()
self.approxParameters = approxParameters
self._postInit()
### add norm{HF,Err} methods
"""
Compute norm of * at arbitrary parameter.
Args:
mu: Target parameter.
Returns:
Target norm of *.
"""
for objName in ["HF", "Err"]:
addNormFieldToClass(self, objName)
### add norm{RHS,Res} methods
"""
Compute norm of * at arbitrary parameter.
Args:
mu: Target parameter.
Returns:
Target norm of *.
"""
for objName in ["RHS", "Res"]:
addNormDualFieldToClass(self, objName)
### add plot{HF,Approx,Err} methods
"""
Do some nice plots of * at arbitrary parameter.
Args:
mu: Target parameter.
name(optional): Name to be shown as title of the plots. Defaults to
'u'.
what(optional): Which plots to do. If list, can contain 'ABS',
'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard 'ALL'.
Defaults to 'ALL'.
save(optional): Where to save plot(s). Defaults to None, i.e. no
saving.
saveFormat(optional): Format for saved plot(s). Defaults to "eps".
saveDPI(optional): DPI for saved plot(s). Defaults to 100.
show(optional): Whether to show figure. Defaults to True.
figspecs(optional key args): Optional arguments for matplotlib
figure creation.
"""
for objName in ["HF", "Approx", "Err"]:
addPlotFieldToClass(self, objName)
### add plot{RHS,Res} methods
"""
Do some nice plots of * at arbitrary parameter.
Args:
mu: Target parameter.
name(optional): Name to be shown as title of the plots. Defaults to
'u'.
what(optional): Which plots to do. If list, can contain 'ABS',
'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard 'ALL'.
Defaults to 'ALL'.
save(optional): Where to save plot(s). Defaults to None, i.e. no
saving.
saveFormat(optional): Format for saved plot(s). Defaults to "eps".
saveDPI(optional): DPI for saved plot(s). Defaults to 100.
show(optional): Whether to show figure. Defaults to True.
figspecs(optional key args): Optional arguments for matplotlib
figure creation.
"""
for objName in ["RHS", "Res"]:
addPlotDualFieldToClass(self, objName)
### add outParaview{HF,RHS,Approx,Res,Err} methods
"""
Output * to ParaView file.
Args:
mu: Target parameter.
name(optional): Base name to be used for data output.
filename(optional): Name of output file.
time(optional): Timestamp.
what(optional): Which plots to do. If list, can contain 'MESH',
'ABS', 'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard
'ALL'. Defaults to 'ALL'.
forceNewFile(optional): Whether to create new output file.
filePW(optional): Fenics File entity (for time series).
"""
for objName in ["HF", "RHS", "Approx", "Res", "Err"]:
addOutParaviewFieldToClass(self, objName)
### add outParaviewTimeDomain{HF,RHS,Approx,Res,Err} methods
"""
Output * to ParaView file, converted to time domain.
Args:
mu: Target parameter.
omega(optional): frequency.
timeFinal(optional): final time of simulation.
periodResolution(optional): number of time steps per period.
name(optional): Base name to be used for data output.
filename(optional): Name of output file.
forceNewFile(optional): Whether to create new output file.
"""
for objName in ["HF", "RHS", "Approx", "Res", "Err"]:
addOutParaviewTimeDomainFieldToClass(self, objName)
def _preInit(self):
if not hasattr(self, "depth"): self.depth = 0
else: self.depth += 1
@property
def tModelType(self):
raise RROMPyException("No trainedModel type assigned.")
def initializeModelData(self, datadict):
from rrompy.reduction_methods.trained_model import TrainedModelData
return (TrainedModelData(datadict["mu0"], datadict.pop("projMat"),
datadict["scaleFactor"],
datadict.pop("rescalingExp")),
["mu0", "scaleFactor", "mus"])
@property
def parameterList(self):
"""Value of parameterListSoft + parameterListCritical."""
return self.parameterListSoft + self.parameterListCritical
def _addParametersToList(self, whatSoft : strLst = [],
defaultSoft : ListAny = [],
whatCritical : strLst = [],
defaultCritical : ListAny = [],
toBeExcluded : strLst = []):
if not hasattr(self, "parameterToBeExcluded"):
self.parameterToBeExcluded = []
self.parameterToBeExcluded = toBeExcluded + self.parameterToBeExcluded
if not hasattr(self, "parameterListSoft"):
self.parameterListSoft = []
if not hasattr(self, "parameterDefaultSoft"):
self.parameterDefaultSoft = {}
if not hasattr(self, "parameterListCritical"):
self.parameterListCritical = []
if not hasattr(self, "parameterDefaultCritical"):
self.parameterDefaultCritical = {}
for j, what in enumerate(whatSoft):
if what not in self.parameterToBeExcluded:
self.parameterListSoft = [what] + self.parameterListSoft
self.parameterDefaultSoft[what] = defaultSoft[j]
for j, what in enumerate(whatCritical):
if what not in self.parameterToBeExcluded:
self.parameterListCritical = ([what]
+ self.parameterListCritical)
self.parameterDefaultCritical[what] = defaultCritical[j]
def _postInit(self):
if self.depth == 0:
vbMng(self, "DEL", "Done initializing.", 10)
del self.depth
else: self.depth -= 1
def name(self) -> str:
return self.__class__.__name__
def __str__(self) -> str:
return self.name()
def __repr__(self) -> str:
return self.__str__() + " at " + hex(id(self))
def setupSampling(self):
"""Setup sampling engine."""
RROMPyAssert(self._mode, message = "Cannot setup sampling engine.")
if not hasattr(self, "_POD") or self._POD is None: return
if self.POD:
SamplingEngine = SamplingEngineStandardPOD
else:
SamplingEngine = SamplingEngineStandard
self.samplingEngine = SamplingEngine(self.HFEngine,
sample_state = self.approx_state,
verbosity = self.verbosity)
@property
def HFEngine(self):
"""Value of HFEngine."""
return self._HFEngine
@HFEngine.setter
def HFEngine(self, HFEngine):
raise RROMPyException("Cannot change HFEngine.")
@property
def mu0(self):
"""Value of mu0."""
return self._mu0
@mu0.setter
def mu0(self, mu0):
mu0 = checkParameter(mu0)
if not hasattr(self, "_mu0") or mu0 != self.mu0:
self.resetSamples()
self._mu0 = mu0
@property
def npar(self):
"""Number of parameters."""
return self.mu0.shape[1]
@property
def approxParameters(self):
"""Value of approximant parameters."""
return self._approxParameters
@approxParameters.setter
def approxParameters(self, approxParams):
if not hasattr(self, "approxParameters"):
self._approxParameters = {}
approxParameters = purgeDict(approxParams, self.parameterList,
dictname = self.name() + ".approxParameters",
baselevel = 1)
keyList = list(approxParameters.keys())
for key in self.parameterListCritical:
if key in keyList:
setattr(self, "_" + key, self.parameterDefaultCritical[key])
for key in self.parameterListSoft:
if key in keyList:
setattr(self, "_" + key, self.parameterDefaultSoft[key])
fragile = False
for key in self.parameterListCritical:
if key in keyList:
val = approxParameters[key]
else:
val = getattr(self, "_" + key, None)
if val is None:
val = self.parameterDefaultCritical[key]
getattr(self.__class__, key, None).fset(self, val)
fragile = fragile or val is None
for key in self.parameterListSoft:
if key in keyList:
val = approxParameters[key]
else:
val = getattr(self, "_" + key, None)
if val is None:
val = self.parameterDefaultSoft[key]
getattr(self.__class__, key, None).fset(self, val)
if fragile:
self._mode = RROMPy_FRAGILE
@property
def POD(self):
"""Value of POD."""
return self._POD
@POD.setter
def POD(self, POD):
if hasattr(self, "_POD"): PODold = self.POD
else: PODold = -1
self._POD = POD
self._approxParameters["POD"] = self.POD
if PODold != self.POD:
self.samplingEngine = None
self.resetSamples()
@property
def approx_state(self):
"""Value of approx_state."""
return self._approx_state
@approx_state.setter
def approx_state(self, approx_state):
if hasattr(self, "_approx_state"): approx_stateold = self.approx_state
else: approx_stateold = -1
self._approx_state = approx_state
if approx_stateold != self.approx_state:
self.samplingEngine = None
self.resetSamples()
@property
def S(self):
"""Value of S."""
return self._S
@S.setter
def S(self, S):
if S <= 0: raise RROMPyException("S must be positive.")
if hasattr(self, "_S") and self._S is not None: Sold = self.S
else: Sold = -1
self._S = S
self._approxParameters["S"] = self.S
if Sold != self.S: self.resetSamples()
@property
def trainedModel(self):
"""Value of trainedModel."""
return self._trainedModel
@trainedModel.setter
def trainedModel(self, trainedModel):
self._trainedModel = trainedModel
if self._trainedModel is not None:
self._trainedModel.reset()
self.lastSolvedApproxReduced = emptyParameterList()
self.lastSolvedApprox = emptyParameterList()
self.uApproxReduced = emptySampleList()
self.uApprox = emptySampleList()
def resetSamples(self):
if hasattr(self, "samplingEngine") and self.samplingEngine is not None:
self.samplingEngine.resetHistory()
else:
self.setupSampling()
self._mode = RROMPy_READY
- def plotSamples(self, warping : List[callable] = None, name : str = "u",
- save : str = None, what : strLst = 'all',
- saveFormat : str = "eps", saveDPI : int = 100,
- show : bool = True, plotArgs : dict = {},
- **figspecs) -> List[str]:
+ def plotSamples(self, *args, **kwargs) -> List[str]:
"""
Do some nice plots of the samples.
- Args:
- warping(optional): Domain warping functions.
- name(optional): Name to be shown as title of the plots. Defaults to
- 'u'.
- what(optional): Which plots to do. If list, can contain 'ABS',
- 'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard 'ALL'.
- Defaults to 'ALL'.
- save(optional): Where to save plot(s). Defaults to None, i.e. no
- saving.
- saveFormat(optional): Format for saved plot(s). Defaults to "eps".
- saveDPI(optional): DPI for saved plot(s). Defaults to 100.
- show(optional): Whether to show figure. Defaults to True.
- plotArgs(optional): Optional arguments for fen/pyplot.
- figspecs(optional key args): Optional arguments for matplotlib
- figure creation.
-
Returns:
Output filenames.
"""
RROMPyAssert(self._mode, message = "Cannot plot samples.")
- return self.samplingEngine.plotSamples(warping, name, save, what,
- saveFormat, saveDPI, show,
- plotArgs, **figspecs)
+ return self.samplingEngine.plotSamples(*args, **kwargs)
- def outParaviewSamples(self, name : str = "u", filename : str = "out",
- times : Np1D = None, what : strLst = 'all',
- forceNewFile : bool = True, folders : bool = False,
- filePW = None) -> List[str]:
+ def outParaviewSamples(self, *args, **kwargs) -> List[str]:
"""
Output samples to ParaView file.
- Args:
- name(optional): Base name to be used for data output.
- filename(optional): Name of output file.
- times(optional): Timestamps.
- what(optional): Which plots to do. If list, can contain 'MESH',
- 'ABS', 'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard
- 'ALL'. Defaults to 'ALL'.
- forceNewFile(optional): Whether to create new output file.
- folders(optional): Whether to split output in folders.
- filePW(optional): Fenics File entity (for time series).
-
Returns:
Output filenames.
"""
RROMPyAssert(self._mode, message = "Cannot output samples.")
- return self.samplingEngine.outParaviewSamples(name, folders, filename,
- times, what,
- forceNewFile, filePW)
+ return self.samplingEngine.outParaviewSamples(*args, **kwargs)
- def outParaviewTimeDomainSamples(self, omegas : Np1D = None,
- timeFinal : Np1D = None,
- periodResolution : int = 20,
- name : str = "u",
- filename : str = "out",
- forceNewFile : bool = True,
- folders : bool = False) -> List[str]:
+ def outParaviewTimeDomainSamples(self, *args, **kwargs) -> List[str]:
"""
Output samples to ParaView file, converted to time domain.
- Args:
- omegas(optional): frequencies.
- timeFinal(optional): final time of simulation.
- periodResolution(optional): number of time steps per period.
- name(optional): Base name to be used for data output.
- filename(optional): Name of output file.
- forceNewFile(optional): Whether to create new output file.
- folders(optional): Whether to split output in folders.
-
Returns:
Output filenames.
"""
RROMPyAssert(self._mode, message = "Cannot output samples.")
- return self.samplingEngine.outParaviewTimeDomainSamples(
- omegas, timeFinal,
- periodResolution,
- name, folders, filename,
- forceNewFile)
+ return self.samplingEngine.outParaviewTimeDomainSamples(*args,
+ **kwargs)
def setSamples(self, samplingEngine):
"""Copy samplingEngine and samples."""
vbMng(self, "INIT", "Transfering samples.", 10)
self.samplingEngine = copy(samplingEngine)
vbMng(self, "DEL", "Done transfering samples.", 10)
def setTrainedModel(self, model):
"""Deepcopy approximation from trained model."""
if hasattr(model, "storeTrainedModel"):
verb = model.verbosity
model.verbosity = 0
fileOut = model.storeTrainedModel()
model.verbosity = verb
else:
try:
fileOut = getNewFilename("trained_model", "pkl")
pickleDump(model.data.__dict__, fileOut)
except:
raise RROMPyException(("Failed to store model data. Parameter "
"model must have either "
"storeTrainedModel or "
"data.__dict__ properties."))
self.loadTrainedModel(fileOut)
osrm(fileOut)
@abstractmethod
def setupApprox(self):
"""
Setup approximant. (ABSTRACT)
Any specialization should include something like
if self.checkComputedApprox():
return
RROMPyAssert(self._mode, message = "Cannot setup approximant.")
...
self.trainedModel = ...
self.trainedModel.data = ...
self.trainedModel.data.approxParameters = copy(
self.approxParameters)
"""
pass
def checkComputedApprox(self) -> bool:
"""
Check if setup of new approximant is not needed.
Returns:
True if new setup is not needed. False otherwise.
"""
return self._mode == RROMPy_FRAGILE or (self.trainedModel is not None
and self.trainedModel.data.approxParameters == self.approxParameters)
def _pruneBeforeEval(self, mu:paramList, field:str, append:bool,
prune:bool) -> Tuple[paramList, Np1D, Np1D, bool]:
mu = checkParameterList(mu, self.npar)[0]
idx = np.empty(len(mu), dtype = np.int)
if prune:
jExtra = np.zeros(len(mu), dtype = bool)
muExtra = emptyParameterList()
lastSolvedMus = getattr(self, "lastSolved" + field)
if (len(mu) > 0 and len(mu) == len(lastSolvedMus)
and mu == lastSolvedMus):
idx = np.arange(len(mu), dtype = np.int)
return muExtra, jExtra, idx, True
muKeep = copy(muExtra)
for j in range(len(mu)):
jPos = lastSolvedMus.find(mu[j])
if jPos is not None:
idx[j] = jPos
muKeep.append(mu[j])
else:
jExtra[j] = True
muExtra.append(mu[j])
if len(muKeep) > 0 and not append:
lastSolvedu = getattr(self, "u" + field)
idx[~jExtra] = getattr(self.__class__, "set" + field)(self,
muKeep, lastSolvedu[idx[~jExtra]], append)
append = True
else:
jExtra = np.ones(len(mu), dtype = bool)
muExtra = mu
return muExtra, jExtra, idx, append
def _setObject(self, mu:paramList, field:str, object:sampList,
append:bool) -> List[int]:
newMus = checkParameterList(mu, self.npar)[0]
newObj = sampleList(object)
if append:
getattr(self, "lastSolved" + field).append(newMus)
getattr(self, "u" + field).append(newObj)
Ltot = len(getattr(self, "u" + field))
return list(range(Ltot - len(newObj), Ltot))
setattr(self, "lastSolved" + field, copy(newMus))
setattr(self, "u" + field, copy(newObj))
return list(range(len(getattr(self, "u" + field))))
def setHF(self, muHF:paramList, uHF:sampleList,
append : bool = False) -> List[int]:
"""Assign high fidelity solution."""
return self._setObject(muHF, "HF", uHF, append)
def evalHF(self, mu:paramList, append : bool = False,
prune : bool = True) -> List[int]:
"""
Find high fidelity solution with original parameters and arbitrary
parameter.
Args:
mu: Target parameter.
append(optional): Whether to append new HF solutions to old ones.
prune(optional): Whether to remove duplicates of already appearing
HF solutions.
"""
muExtra, jExtra, idx, append = self._pruneBeforeEval(mu, "HF", append,
prune)
if len(muExtra) > 0:
vbMng(self, "INIT", "Solving HF model for mu = {}.".format(mu),
15)
newuHFs = self.HFEngine.solve(muExtra)
vbMng(self, "DEL", "Done solving HF model.", 15)
idx[jExtra] = self.setHF(muExtra, newuHFs, append)
return list(idx)
def setApproxReduced(self, muApproxR:paramList, uApproxR:sampleList,
append : bool = False) -> List[int]:
"""Assign high fidelity solution."""
return self._setObject(muApproxR, "ApproxReduced", uApproxR, append)
def evalApproxReduced(self, mu:paramList, append : bool = False,
prune : bool = True) -> List[int]:
"""
Evaluate reduced representation of approximant at arbitrary parameter.
Args:
mu: Target parameter.
append(optional): Whether to append new HF solutions to old ones.
prune(optional): Whether to remove duplicates of already appearing
HF solutions.
"""
self.setupApprox()
muExtra, jExtra, idx, append = self._pruneBeforeEval(mu,
"ApproxReduced",
append, prune)
if len(muExtra) > 0:
newuApproxs = self.trainedModel.getApproxReduced(muExtra)
idx[jExtra] = self.setApproxReduced(muExtra, newuApproxs, append)
return list(idx)
def setApprox(self, muApprox:paramList, uApprox:sampleList,
append : bool = False) -> List[int]:
"""Assign high fidelity solution."""
return self._setObject(muApprox, "Approx", uApprox, append)
def evalApprox(self, mu:paramList, append : bool = False,
prune : bool = True) -> List[int]:
"""
Evaluate approximant at arbitrary parameter.
Args:
mu: Target parameter.
append(optional): Whether to append new HF solutions to old ones.
prune(optional): Whether to remove duplicates of already appearing
HF solutions.
"""
self.setupApprox()
muExtra, jExtra, idx, append = self._pruneBeforeEval(mu, "Approx",
append, prune)
if len(muExtra) > 0:
newuApproxs = self.trainedModel.getApprox(muExtra)
idx[jExtra] = self.setApprox(muExtra, newuApproxs, append)
return list(idx)
def getHF(self, mu:paramList, append : bool = False,
prune : bool = True) -> sampList:
"""
Get HF solution at arbitrary parameter.
Args:
mu: Target parameter.
Returns:
HFsolution.
"""
mu = checkParameterList(mu, self.npar)[0]
idx = self.evalHF(mu, append = append, prune = prune)
return self.uHF(idx)
def getRHS(self, mu:paramList) -> sampList:
"""
Get linear system RHS at arbitrary parameter.
Args:
mu: Target parameter.
Returns:
Linear system RHS.
"""
return self.HFEngine.residual(mu, None)
def getApproxReduced(self, mu:paramList, append : bool = False,
prune : bool = True) -> sampList:
"""
Get approximant at arbitrary parameter.
Args:
mu: Target parameter.
Returns:
Reduced approximant.
"""
mu = checkParameterList(mu, self.npar)[0]
idx = self.evalApproxReduced(mu, append = append, prune = prune)
return self.uApproxReduced(idx)
def getApprox(self, mu:paramList, append : bool = False,
prune : bool = True) -> sampList:
"""
Get approximant at arbitrary parameter.
Args:
mu: Target parameter.
Returns:
Approximant.
"""
mu = checkParameterList(mu, self.npar)[0]
idx = self.evalApprox(mu, append = append, prune = prune)
return self.uApprox(idx)
def getRes(self, mu:paramList) -> sampList:
"""
Get residual at arbitrary parameter.
Args:
mu: Target parameter.
Returns:
Approximant residual.
"""
if not self.HFEngine.isCEye:
raise RROMPyException(("Residual of solution with non-scalar C "
"not computable."))
return self.HFEngine.residual(mu, self.getApprox(mu) / self.HFEngine.C)
def getErr(self, mu:paramList, append : bool = False,
prune : bool = True) -> sampList:
"""
Get error at arbitrary parameter.
Args:
mu: Target parameter.
Returns:
Approximant error.
"""
return (self.getApprox(mu, append = append, prune =prune)
- self.getHF(mu, append = append, prune = prune))
def normApprox(self, mu:paramList) -> float:
"""
Compute norm of approximant at arbitrary parameter.
Args:
mu: Target parameter.
Returns:
Target norm of approximant.
"""
if not (self.POD and self.HFEngine.isCEye):
return self.HFEngine.norm(self.getApprox(mu), is_state = False)
return np.linalg.norm(self.HFEngine.C * self.getApproxReduced(mu).data,
axis = 0)
def getPoles(self, *args, **kwargs) -> Np1D:
"""
Obtain approximant poles.
Returns:
Numpy complex vector of poles.
"""
self.setupApprox()
vbMng(self, "INIT", "Computing poles of model.", 20)
poles = self.trainedModel.getPoles(*args, **kwargs)
vbMng(self, "DEL", "Done computing poles.", 20)
return poles
def storeTrainedModel(self, filenameBase : str = "trained_model",
forceNewFile : bool = True) -> str:
"""Store trained reduced model to file."""
self.setupApprox()
vbMng(self, "INIT", "Storing trained model to file.", 20)
if forceNewFile:
filename = getNewFilename(filenameBase, "pkl")
else:
filename = "{}.pkl".format(filenameBase)
pickleDump(self.trainedModel.data.__dict__, filename)
vbMng(self, "DEL", "Done storing trained model.", 20)
return filename
def loadTrainedModel(self, filename:str):
"""Load trained reduced model from file."""
vbMng(self, "INIT", "Loading pre-trained model from file.", 20)
datadict = pickleLoad(filename)
self.mu0 = datadict["mu0"]
self.scaleFactor = datadict["scaleFactor"]
self.mus = datadict["mus"]
trainedModel = self.tModelType()
trainedModel.verbosity = self.verbosity
trainedModel.timestamp = self.timestamp
data, selfkeys = self.initializeModelData(datadict)
for key in selfkeys: setattr(self, key, datadict.pop(key))
approxParameters = datadict.pop("approxParameters")
data.approxParameters = copy(approxParameters)
for apkey in data.approxParameters.keys():
self._approxParameters[apkey] = approxParameters.pop(apkey)
setattr(self, "_" + apkey, self._approxParameters[apkey])
for key in datadict: setattr(data, key, datadict[key])
trainedModel.data = data
self.trainedModel = trainedModel
self._mode = RROMPy_FRAGILE
vbMng(self, "DEL", "Done loading pre-trained model.", 20)
diff --git a/rrompy/sampling/base/sampling_engine_base.py b/rrompy/sampling/base/sampling_engine_base.py
index d34c933..6035c76 100644
--- a/rrompy/sampling/base/sampling_engine_base.py
+++ b/rrompy/sampling/base/sampling_engine_base.py
@@ -1,206 +1,190 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
import numpy as np
-from rrompy.utilities.base.types import (Np1D, HFEng, List, strLst, paramVal,
+from rrompy.utilities.base.types import (Np1D, HFEng, List, paramVal,
paramList, sampList)
from rrompy.utilities.base import verbosityManager as vbMng
from rrompy.utilities.exception_manager import RROMPyWarning
from rrompy.parameter import (emptyParameterList, checkParameter,
checkParameterList)
from rrompy.sampling import emptySampleList
__all__ = ['SamplingEngineBase']
class SamplingEngineBase:
"""HERE"""
def __init__(self, HFEngine:HFEng, sample_state : bool = False,
verbosity : int = 10, timestamp : bool = True):
self.sample_state = sample_state
self.verbosity = verbosity
self.timestamp = timestamp
vbMng(self, "INIT",
"Initializing sampling engine of type {}.".format(self.name()),
10)
self.HFEngine = HFEngine
vbMng(self, "DEL", "Done initializing sampling engine.", 10)
def name(self) -> str:
return self.__class__.__name__
def __str__(self) -> str:
return self.name()
def __repr__(self) -> str:
return self.__str__() + " at " + hex(id(self))
def resetHistory(self):
self.samples = emptySampleList()
self.nsamples = 0
self.mus = emptyParameterList()
self._derIdxs = []
def popSample(self):
if hasattr(self, "nsamples") and self.nsamples > 1:
if self.samples.shape[1] > self.nsamples:
RROMPyWarning(("More than 'nsamples' memory allocated for "
"samples. Popping empty sample column."))
self.nsamples += 1
self.nsamples -= 1
self.samples.pop()
self.mus.pop()
else:
self.resetHistory()
def preallocateSamples(self, u:sampList, mu:paramVal, n:int):
self.samples.reset((u.shape[0], n), u.dtype)
self.samples[0] = u
mu = checkParameter(mu, self.HFEngine.npar)
self.mus.reset((n, self.HFEngine.npar))
self.mus[0] = mu[0]
@property
def HFEngine(self):
"""Value of HFEngine. Its assignment resets history."""
return self._HFEngine
@HFEngine.setter
def HFEngine(self, HFEngine):
self._HFEngine = HFEngine
self.resetHistory()
def solveLS(self, mu : paramList = [], RHS : sampList = None) -> sampList:
"""
Solve linear system.
Args:
mu: Parameter value.
Returns:
Solution of system.
"""
mu = checkParameterList(mu, self.HFEngine.npar)[0]
vbMng(self, "INIT", "Solving HF model for mu = {}.".format(mu), 15)
u = self.HFEngine.solve(mu, RHS, return_state = self.sample_state,
verbose = (self.verbosity >= 20))
vbMng(self, "DEL", "Done solving HF model.", 15)
return u
- def plotSamples(self, warping : List[callable] = None, name : str = "u",
- save : str = None, what : strLst = 'all',
- saveFormat : str = "eps", saveDPI : int = 100,
- show : bool = True, plotArgs : dict = {},
- **figspecs) -> List[str]:
+ def plotSamples(self, warpings : List[List[callable]] = None,
+ name : str = "u", **kwargs) -> List[str]:
"""
Do some nice plots of the samples.
Args:
- warping(optional): Domain warping functions.
+ warpings(optional): Domain warping functions.
name(optional): Name to be shown as title of the plots. Defaults to
'u'.
- save(optional): Where to save plot(s). Defaults to None, i.e. no
- saving.
- what(optional): Which plots to do. If list, can contain 'ABS',
- 'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard 'ALL'.
- Defaults to 'ALL'.
- saveFormat(optional): Format for saved plot(s). Defaults to "eps".
- saveDPI(optional): DPI for saved plot(s). Defaults to 100.
- show(optional): Whether to show figure. Defaults to True.
- plotArgs(optional): Optional arguments for fen/pyplot.
- figspecs(optional key args): Optional arguments for matplotlib
- figure creation.
Returns:
Output filenames.
"""
+ if warpings is None: warpings = [None] * self.nsamples
filesOut = [None] * self.nsamples
for j in range(self.nsamples):
- filesOut[j] = self.HFEngine.plot(self.samples[j], warping, False,
- "{}_{}".format(name, j), save,
- what, saveFormat, saveDPI, show,
- plotArgs, **figspecs)
+ filesOut[j] = self.HFEngine.plot(self.samples[j], warpings[j],
+ self.sample_state,
+ "{}_{}".format(name, j), **kwargs)
if filesOut[0] is None: return None
return filesOut
- def outParaviewSamples(self, name : str = "u", folders : bool = True,
- filename : str = "out", times : Np1D = None,
- what : strLst = 'all', forceNewFile : bool = True,
- filePW = None) -> List[str]:
+ def outParaviewSamples(self, warpings : List[List[callable]] = None,
+ name : str = "u", filename : str = "out",
+ times : Np1D = None, **kwargs) -> List[str]:
"""
Output samples to ParaView file.
Args:
+ warpings(optional): Domain warping functions.
name(optional): Base name to be used for data output.
- folders(optional): Whether to split output in folders.
filename(optional): Name of output file.
times(optional): Timestamps.
- what(optional): Which plots to do. If list, can contain 'MESH',
- 'ABS', 'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard
- 'ALL'. Defaults to 'ALL'.
- forceNewFile(optional): Whether to create new output file.
- filePW(optional): Fenics File entity (for time series).
Returns:
Output filenames.
"""
+ if warpings is None: warpings = [None] * self.nsamples
if times is None: times = [0.] * self.nsamples
filesOut = [None] * self.nsamples
for j in range(self.nsamples):
- filesOut[j] = self.HFEngine.outParaview(self.samples[j], None,
- False, "{}_{}".format(name, j),
- "{}_{}".format(filename, j),
- times[j], what, forceNewFile,
- folders, filePW)
+ filesOut[j] = self.HFEngine.outParaview(self.samples[j],
+ warpings[j],
+ self.sample_state,
+ "{}_{}".format(name, j),
+ "{}_{}".format(filename, j),
+ times[j], **kwargs)
if filesOut[0] is None: return None
return filesOut
def outParaviewTimeDomainSamples(self, omegas : Np1D = None,
+ warpings : List[List[callable]] = None,
timeFinal : Np1D = None,
- periodResolution : int = 20,
- name : str = "u", folders : bool = True,
- filename : str = "out",
- forceNewFile : bool = True) -> List[str]:
+ periodResolution : List[int] = 20,
+ name : str = "u", filename : str = "out",
+ **kwargs) -> List[str]:
"""
Output samples to ParaView file, converted to time domain.
Args:
omegas(optional): frequencies.
timeFinal(optional): final time of simulation.
periodResolution(optional): number of time steps per period.
name(optional): Base name to be used for data output.
- folders(optional): Whether to split output in folders.
filename(optional): Name of output file.
- forceNewFile(optional): Whether to create new output file.
Returns:
Output filename.
"""
if omegas is None: omegas = np.real(self.mus)
+ if warpings is None: warpings = [None] * self.nsamples
if not isinstance(timeFinal, (list, tuple,)):
timeFinal = [timeFinal] * self.nsamples
+ if not isinstance(periodResolution, (list, tuple,)):
+ periodResolution = [periodResolution] * self.nsamples
filesOut = [None] * self.nsamples
for j in range(self.nsamples):
filesOut[j] = self.HFEngine.outParaviewTimeDomain(self.samples[j],
- omegas[j], None, False,
+ omegas[j], warpings[j],
+ self.sample_state,
timeFinal[j],
- periodResolution,
+ periodResolution[j],
"{}_{}".format(name, j),
"{}_{}".format(filename, j),
- forceNewFile, folders)
+ **kwargs)
if filesOut[0] is None: return None
return filesOut
diff --git a/rrompy/sampling/base/sampling_engine_base_pivoted.py b/rrompy/sampling/base/sampling_engine_base_pivoted.py
index e9eda0d..9a93ab6 100644
--- a/rrompy/sampling/base/sampling_engine_base_pivoted.py
+++ b/rrompy/sampling/base/sampling_engine_base_pivoted.py
@@ -1,234 +1,222 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
import numpy as np
-from rrompy.utilities.base.types import (Np1D, HFEng, List, ListAny, strLst,
- paramVal, paramList, sampList)
+from rrompy.utilities.base.types import (Np1D, HFEng, List, ListAny, paramVal,
+ paramList, sampList)
from rrompy.utilities.base import verbosityManager as vbMng
from rrompy.utilities.exception_manager import RROMPyWarning
from rrompy.parameter import (emptyParameterList, checkParameter,
checkParameterList)
from rrompy.sampling import emptySampleList
from .sampling_engine_base import SamplingEngineBase
__all__ = ['SamplingEngineBasePivoted']
class SamplingEngineBasePivoted(SamplingEngineBase):
"""HERE"""
def __init__(self, HFEngine:HFEng, directionPivot:ListAny,
*args, **kwargs):
super().__init__(HFEngine, *args, **kwargs)
self.directionPivot = directionPivot
self.HFEngineMarginalized = None
self.resetHistory()
@property
def directionMarginal(self):
return tuple([x for x in range(self.HFEngine.npar) \
if x not in self.directionPivot])
@property
def nPivot(self):
return len(self.directionPivot)
@property
def nMarginal(self):
return len(self.directionMarginal)
@property
def nsamplesTot(self):
return np.sum(self.nsamples)
def resetHistory(self, j : int = 1):
self.samples = [emptySampleList() for _ in range(j)]
self.nsamples = [0] * j
self.mus = [emptyParameterList() for _ in range(j)]
self._derIdxs = [[] for _ in range(j)]
def popSample(self, j:int):
if hasattr(self, "nsamples") and self.nsamples[j] > 1:
if self.samples[j].shape[1] > self.nsamples[j]:
RROMPyWarning(("More than 'nsamples' memory allocated for "
"samples. Popping empty sample column."))
self.nsamples[j] += 1
self.nsamples[j] -= 1
self.samples[j].pop()
self.mus[j].pop()
else:
self.resetHistory()
def preallocateSamples(self, u:sampList, mu:paramVal, n:int, j:int):
self.samples[j].reset((u.shape[0], n), u.dtype)
self.samples[j][0] = u
mu = checkParameter(mu, self.nPivot)
self.mus[j].reset((n, self.nPivot))
self.mus[j][0] = mu[0]
def coalesceSamples(self):
vbMng(self, "INIT", "Coalescing samples.", 7)
self.samplesCoalesced = emptySampleList()
self.samplesCoalesced.reset((self.samples[0].shape[0],
np.sum([samp.shape[1] \
for samp in self.samples])),
self.samples[0].dtype)
run_idx = 0
for samp in self.samples:
slen = samp.shape[1]
self.samplesCoalesced.data[:, run_idx : run_idx + slen] = samp.data
run_idx += slen
vbMng(self, "DEL", "Done coalescing samples.", 7)
def solveLS(self, mu : paramList = [], RHS : sampList = None) -> sampList:
"""
Solve linear system.
Args:
mu: Parameter value.
Returns:
Solution of system.
"""
mu = checkParameterList(mu, self.nPivot)[0]
vbMng(self, "INIT",
("Solving HF model for muPivot = {} and muMarginal = "
"{}.").format(mu, self.HFEngineMarginalized.muFixed), 15)
u = self.HFEngineMarginalized.solve(mu, RHS,
return_state = self.sample_state,
verbose = (self.verbosity >= 20))
vbMng(self, "DEL", "Done solving HF model.", 15)
return u
- def plotSamples(self, warping : List[callable] = None, name : str = "u",
- save : str = None, what : strLst = 'all',
- saveFormat : str = "eps", saveDPI : int = 100,
- show : bool = True, plotArgs : dict = {},
- **figspecs) -> List[str]:
+ def plotSamples(self, warpings : List[List[List[callable]]] = None,
+ name : str = "u", **kwargs) -> List[List[str]]:
"""
Do some nice plots of the samples.
Args:
- warping(optional): Domain warping functions.
+ warpings(optional): Domain warping functions.
name(optional): Name to be shown as title of the plots. Defaults to
'u'.
- save(optional): Where to save plot(s). Defaults to None, i.e. no
- saving.
- what(optional): Which plots to do. If list, can contain 'ABS',
- 'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard 'ALL'.
- Defaults to 'ALL'.
- saveFormat(optional): Format for saved plot(s). Defaults to "eps".
- saveDPI(optional): DPI for saved plot(s). Defaults to 100.
- show(optional): Whether to show figure. Defaults to True.
- plotArgs(optional): Optional arguments for fen/pyplot.
- figspecs(optional key args): Optional arguments for matplotlib
- figure creation.
Returns:
Output filenames.
"""
+ if warpings is None: warpings = [[None] * self.nsamples[i] \
+ for i in range(len(self.nsamples))]
filesOut = []
for i in range(len(self.nsamples)):
filesOuti = [None] * self.nsamples[i]
for j in range(self.nsamples[i]):
filesOuti[j] = self.HFEngine.plot(self.samples[i][j],
- warping, False,
+ warpings[i][j],
+ self.sample_state,
"{}_{}_{}".format(name, i, j),
- save, what, saveFormat,
- saveDPI, show, plotArgs,
- **figspecs)
+ **kwargs)
filesOut += [filesOuti]
if filesOut[0][0] is None: return None
return filesOut
- def outParaviewSamples(self, name : str = "u", folders : bool = True,
- filename : str = "out", times : Np1D = None,
- what : strLst = 'all', forceNewFile : bool = True,
- filePW = None) -> List[str]:
+ def outParaviewSamples(self, warpings : List[List[List[callable]]] = None,
+ name : str = "u", filename : str = "out",
+ times : List[Np1D] = None,
+ **kwargs) -> List[List[str]]:
"""
Output samples to ParaView file.
Args:
+ warpings(optional): Domain warping functions.
name(optional): Base name to be used for data output.
- folders(optional): Whether to split output in folders.
filename(optional): Name of output file.
times(optional): Timestamps.
- what(optional): Which plots to do. If list, can contain 'MESH',
- 'ABS', 'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard
- 'ALL'. Defaults to 'ALL'.
- forceNewFile(optional): Whether to create new output file.
- filePW(optional): Fenics File entity (for time series).
Returns:
Output filenames.
"""
+ if warpings is None: warpings = [[None] * self.nsamples[i] \
+ for i in range(len(self.nsamples))]
if times is None: times = [[0.] * self.nsamples[i] \
for i in range(len(self.nsamples))]
filesOut = []
for i in range(len(self.nsamples)):
filesOuti = [None] * self.nsamples[i]
for j in range(self.nsamples[i]):
- filesOuti[j] = self.HFEngine.outParaview(
- self.samples[i][j], None, False,
+ filesOuti[j] = self.HFEngine.outParaview(self.samples[i][j],
+ warpings[i][j], self.sample_state,
"{}_{}_{}".format(name, i, j),
"{}_{}_{}".format(filename, i, j),
- times[i][j], what, forceNewFile,
- folders, filePW)
+ times[i][j], **kwargs)
filesOut += [filesOuti]
if filesOut[0][0] is None: return None
return filesOut
def outParaviewTimeDomainSamples(self, omegas : Np1D = None,
- timeFinal : Np1D = None,
- periodResolution : int = 20,
- name : str = "u", folders : bool = True,
- filename : str = "out",
- forceNewFile : bool = True) -> List[str]:
+ warpings : List[List[List[callable]]] = None,
+ timeFinal : Np1D = None,
+ periodResolution : List[List[int]] = 20,
+ name : str = "u", filename : str = "out",
+ **kwargs) -> List[List[str]]:
"""
Output samples to ParaView file, converted to time domain.
Args:
omegas(optional): frequencies.
+ warpings(optional): Domain warping functions.
timeFinal(optional): final time of simulation.
periodResolution(optional): number of time steps per period.
name(optional): Base name to be used for data output.
- folders(optional): Whether to split output in folders.
filename(optional): Name of output file.
- forceNewFile(optional): Whether to create new output file.
Returns:
Output filenames.
"""
- if omegas is None: omegas = [[np.real(self.mus[i])] \
+ if omegas is None: omegas = [np.real(self.mus[i]) \
+ for i in range(len(self.nsamples))]
+ if warpings is None: warpings = [[None] * self.nsamples[i] \
for i in range(len(self.nsamples))]
if not isinstance(timeFinal, (list, tuple,)):
timeFinal = [[timeFinal] * self.nsamples[i] \
for i in range(len(self.nsamples))]
+ if not isinstance(periodResolution, (list, tuple,)):
+ periodResolution = [[periodResolution] * self.nsamples[i] \
+ for i in range(len(self.nsamples))]
filesOut = []
for i in range(len(self.nsamples)):
filesOuti = [None] * self.nsamples[i]
for j in range(self.nsamples[i]):
filesOuti[j] = self.HFEngine.outParaviewTimeDomain(
self.samples[i][j], omegas[i][j],
- None, False, timeFinal[i][j],
- periodResolution,
+ warpings[i][j], self.sample_state,
+ timeFinal[i][j],
+ periodResolution[i][j],
"{}_{}_{}".format(name, i, j),
"{}_{}_{}".format(filename, i, j),
- forceNewFile, folders)
+ **kwargs)
filesOut += [filesOuti]
if filesOut[0][0] is None: return None
return filesOut