diff --git a/rrompy/hfengines/base/fenics_engine_base.py b/rrompy/hfengines/base/fenics_engine_base.py index 783343a..2a2c303 100644 --- a/rrompy/hfengines/base/fenics_engine_base.py +++ b/rrompy/hfengines/base/fenics_engine_base.py @@ -1,514 +1,514 @@ # Copyright (C) 2018-2020 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from os import path, mkdir import fenics as fen import numpy as np from matplotlib import pyplot as plt from .scipy_engine_base import ScipyEngineBase, checknports from rrompy.utilities.base.types import (Np1D, strLst, FenFunc, Tuple, List, FigHandle) from rrompy.utilities.base.data_structures import purgeList, getNewFilename from rrompy.utilities.base import verbosityManager as vbMng from rrompy.solver.fenics import (L2NormMatrix, fenplot, interp_project, serializeFunctionSpace) from .boundary_conditions import BoundaryConditions from rrompy.utilities.exception_manager import RROMPyException from rrompy.utilities.parallel import (SELF, masterCore, bcast, indicesScatter, listGather) __all__ = ['FenicsEngineBase', 'FenicsEngineBaseTensorized'] def plottingBaseFen(u, fig, V, what, nRows, subplotidx, warping, name, colorbar, fenplotArgs): if 'ABS' in what: uAb = fen.Function(V) uAb.vector().set_local(np.abs(u)) subplotidx = subplotidx + 1 ax = fig.add_subplot(nRows, len(what), subplotidx) p = fenplot(uAb, warping = warping, title = "|{}|".format(name), **fenplotArgs) if colorbar: fig.colorbar(p, ax = ax) if 'PHASE' in what: uPh = fen.Function(V) uPh.vector().set_local(np.angle(u)) subplotidx = subplotidx + 1 ax = fig.add_subplot(nRows, len(what), subplotidx) p = fenplot(uPh, warping = warping, title = "phase({})".format(name), **fenplotArgs) if colorbar: fig.colorbar(p, ax = ax) if 'REAL' in what: uRe = fen.Function(V) uRe.vector().set_local(np.real(u)) subplotidx = subplotidx + 1 ax = fig.add_subplot(nRows, len(what), subplotidx) p = fenplot(uRe, warping = warping, title = "Re({})".format(name), **fenplotArgs) if colorbar: fig.colorbar(p, ax = ax) if 'IMAG' in what: uIm = fen.Function(V) uIm.vector().set_local(np.imag(u)) subplotidx = subplotidx + 1 ax = fig.add_subplot(nRows, len(what), subplotidx) p = fenplot(uIm, warping = warping, title = "Im({})".format(name), **fenplotArgs) if colorbar: fig.colorbar(p, ax = ax) class FenicsEngineBase(ScipyEngineBase): """Generic solver for parametric fenics problems.""" def __init__(self, degree_threshold : int = np.inf, verbosity : int = 10, timestamp : bool = True): super().__init__(verbosity = verbosity, timestamp = timestamp) self.BCManager = BoundaryConditions("Dirichlet") self.V = fen.FunctionSpace(fen.UnitSquareMesh(SELF, 1, 1), "P", 1) self.degree_threshold = degree_threshold @property def V(self): """Value of V.""" return self._V @V.setter def V(self, V): if not type(V).__name__ == 'FunctionSpace': raise RROMPyException("V type not recognized.") self.dsToBeSet = True self._V = serializeFunctionSpace(V) self.u = fen.TrialFunction(self._V) self.v = fen.TestFunction(self._V) @property def spacedim(self): if hasattr(self, "_V"): return self.V.dim() return super().spacedim def autoSetDS(self): """Set FEniCS boundary measure based on boundary function handles.""" if self.dsToBeSet: vbMng(self, "INIT", "Initializing boundary measures.", 20) mesh = self.V.mesh() NB = self.NeumannBoundary RB = self.RobinBoundary boundary_markers = fen.MeshFunction("size_t", mesh, mesh.topology().dim() - 1) NB.mark(boundary_markers, 0) RB.mark(boundary_markers, 1) self.ds = fen.Measure("ds", domain = mesh, subdomain_data = boundary_markers) self.dsToBeSet = False vbMng(self, "DEL", "Done assembling boundary measures.", 20) def buildEnergyNormForm(self): """ Build sparse matrix (in CSR format) representative of scalar product. """ vbMng(self, "INIT", "Assembling energy matrix.", 20) self.energyNormMatrix = L2NormMatrix(self.V) vbMng(self, "DEL", "Done assembling energy matrix.", 20) def buildEnergyNormDualForm(self): """ Build sparse matrix (in CSR format) representative of dual scalar product without duality. """ if not hasattr(self, "energyNormMatrix"): self.buildEnergyNormForm() self.energyNormDualMatrix = self.energyNormMatrix def liftDirichletData(self) -> Np1D: """Lift Dirichlet datum.""" if not hasattr(self, "_liftedDirichletDatum"): liftRe = interp_project(self.DirichletDatum[0], self.V) liftIm = interp_project(self.DirichletDatum[1], self.V) self._liftedDirichletDatum = (np.array(liftRe.vector()) + 1.j * np.array(liftIm.vector())) return self._liftedDirichletDatum def reduceQuadratureDegree(self, fun:FenFunc, name:str): """Check whether to reduce compiler parameters to degree threshold.""" if not np.isinf(self.degree_threshold): from ufl.algorithms.estimate_degrees import ( estimate_total_polynomial_degree as ETPD) try: deg = ETPD(fun) except: return False if deg > self.degree_threshold: vbMng(self, "MAIN", ("Reducing quadrature degree from {} to {} for " "{}.").format(deg, self.degree_threshold, name), 15) return True return False def iterReduceQuadratureDegree(self, funsNames:List[Tuple[FenFunc, str]]): """ Iterate reduceQuadratureDegree over list and define reduce compiler parameters. """ if funsNames is not None: for fun, name in funsNames: if self.reduceQuadratureDegree(fun, name): return {"quadrature_degree" : self.degree_threshold} return {} def plot(self, u:Np1D, warping : List[callable] = None, is_state : bool = False, name : str = "u", save : str = None, what : strLst = 'all', forceNewFile : bool = True, saveFormat : str = "eps", saveDPI : int = 100, show : bool = True, colorMap : str = "jet", fenplotArgs : dict = {}, **figspecs) -> Tuple[FigHandle, str]: """ Do some nice plots of the complex-valued function with given dofs. Args: u: numpy complex array with function dofs. warping(optional): Domain warping functions. is_state(optional): whether given u is value before multiplication by c. Defaults to False. name(optional): Name to be shown as title of the plots. Defaults to 'u'. save(optional): Where to save plot(s). Defaults to None, i.e. no saving. what(optional): Which plots to do. If list, can contain 'ABS', 'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard 'ALL'. Defaults to 'ALL'. forceNewFile(optional): Whether to create new output file. saveFormat(optional): Format for saved plot(s). Defaults to "eps". saveDPI(optional): DPI for saved plot(s). Defaults to 100. show(optional): Whether to show figure. Defaults to True. colorMap(optional): Pyplot colormap. Defaults to 'jet'. fenplotArgs(optional): Optional arguments for fenplot. figspecs(optional key args): Optional arguments for matplotlib figure creation. Returns: Output filename and figure handle. """ if not is_state and not self.isCEye: return super().plot(u, warping, False, name, save, what, forceNewFile, saveFormat, saveDPI, show, colorMap, fenplotArgs, **figspecs) if isinstance(what, (str,)): if what.upper() == 'ALL': what = ['ABS', 'PHASE', 'REAL', 'IMAG'] else: what = [what] what = purgeList(what, ['ABS', 'PHASE', 'REAL', 'IMAG'], listname = self.name() + ".what", baselevel = 1) if len(what) == 0: return out = None if masterCore(): if 'figsize' not in figspecs.keys(): figspecs['figsize'] = plt.figaspect(1. / len(what)) fig = plt.figure(**figspecs) plt.set_cmap(colorMap) plottingBaseFen(u, fig, self.V, what, 1, 0, warping, name, self.V.mesh().geometric_dimension() > 1, fenplotArgs) plt.tight_layout() if save is not None: save = save.strip() if forceNewFile: fileOut = getNewFilename("{}_fig_".format(save), saveFormat) else: fileOut = "{}_fig.{}".format(save, saveFormat) fig.savefig(fileOut, format = saveFormat, dpi = saveDPI) else: fileOut = None if show: plt.show() out = fig if fileOut is None else (fig, fileOut) return bcast(out) def plotmesh(self, warping : List[callable] = None, name : str = "Mesh", save : str = None, forceNewFile : bool = True, saveFormat : str = "eps", saveDPI : int = 100, show : bool = True, fenplotArgs : dict = {}, **figspecs) -> Tuple[FigHandle, str]: """ Do a nice plot of the mesh. Args: u: numpy complex array with function dofs. warping(optional): Domain warping functions. name(optional): Name to be shown as title of the plots. Defaults to 'u'. save(optional): Where to save plot(s). Defaults to None, i.e. no saving. forceNewFile(optional): Whether to create new output file. saveFormat(optional): Format for saved plot(s). Defaults to "eps". saveDPI(optional): DPI for saved plot(s). Defaults to 100. show(optional): Whether to show figure. Defaults to True. fenplotArgs(optional): Optional arguments for fenplot. figspecs(optional key args): Optional arguments for matplotlib figure creation. Returns: Output filename and figure handle. """ out = None if masterCore(): fig = plt.figure(**figspecs) fenplot(self.V.mesh(), warping = warping, **fenplotArgs) plt.tight_layout() if save is not None: save = save.strip() if forceNewFile: fileOut = getNewFilename("{}_msh_".format(save), saveFormat) else: fileOut = "{}_msh.{}".format(save, saveFormat) fig.savefig(fileOut, format = saveFormat, dpi = saveDPI) else: fileOut = None if show: plt.show() out = fig if fileOut is None else (fig, fileOut) return bcast(out) def outParaview(self, u:Np1D, warping : List[callable] = None, is_state : bool = False, name : str = "u", filename : str = "out", time : float = 0., what : strLst = 'all', forceNewFile : bool = True, folder : bool = False, filePW = None) -> str: """ Output complex-valued function with given dofs to ParaView file. Args: u: numpy complex array with function dofs. warping(optional): Domain warping functions. is_state(optional): whether given u is value before multiplication by c. Defaults to False. name(optional): Base name to be used for data output. filename(optional): Name of output file. time(optional): Timestamp. what(optional): Which plots to do. If list, can contain 'MESH', 'ABS', 'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard 'ALL'. Defaults to 'ALL'. forceNewFile(optional): Whether to create new output file. folder(optional): Whether to create an additional folder layer. filePW(optional): Fenics File entity (for time series). Returns: Output filename. """ if not is_state and not self.isCEye: raise RROMPyException(("Cannot output to Paraview non-state " "object.")) if isinstance(what, (str,)): if what.upper() == 'ALL': what = ['MESH', 'ABS', 'PHASE', 'REAL', 'IMAG'] else: what = [what] what = purgeList(what, ['MESH', 'ABS', 'PHASE', 'REAL', 'IMAG'], listname = self.name() + ".what", baselevel = 1) if len(what) == 0: return filePW = None if masterCore(): if filePW is None: if folder: if not path.exists(filename + "/"): mkdir(filename) idxpath = filename.rfind("/") filename += "/" + filename[idxpath + 1 :] if forceNewFile: filePW = fen.File(getNewFilename(filename, "pvd")) else: filePW = fen.File("{}.pvd".format(filename)) if warping is not None: fen.ALE.move(self.V.mesh(), interp_project(warping[0], self.V.mesh())) if what == ['MESH']: filePW << (self.V.mesh(), time) if 'ABS' in what: uAb = fen.Function(self.V, name = "{}_ABS".format(name)) uAb.vector().set_local(np.abs(u)) filePW << (uAb, time) if 'PHASE' in what: uPh = fen.Function(self.V, name = "{}_PHASE".format(name)) uPh.vector().set_local(np.angle(u)) filePW << (uPh, time) if 'REAL' in what: uRe = fen.Function(self.V, name = "{}_REAL".format(name)) uRe.vector().set_local(np.real(u)) filePW << (uRe, time) if 'IMAG' in what: uIm = fen.Function(self.V, name = "{}_IMAG".format(name)) uIm.vector().set_local(np.imag(u)) filePW << (uIm, time) if warping is not None: fen.ALE.move(self.V.mesh(), interp_project(warping[1], self.V.mesh())) return bcast(filePW) def outParaviewTimeDomain(self, u:Np1D, omega:float, warping : List[callable] = None, is_state : bool = False, timeFinal : float = None, periodResolution : int = 20, name : str = "u", filename : str = "out", forceNewFile : bool = True, folder : bool = False) -> str: """ Output complex-valued function with given dofs to ParaView file, converted to time domain. Args: u: numpy complex array with function dofs. omega: frequency. warping(optional): Domain warping functions. is_state(optional): whether given u is value before multiplication by c. Defaults to False. timeFinal(optional): final time of simulation. periodResolution(optional): number of time steps per period. name(optional): Base name to be used for data output. filename(optional): Name of output file. forceNewFile(optional): Whether to create new output file. folder(optional): Whether to create an additional folder layer. Returns: Output filename. """ if not is_state and not self.isCEye: raise RROMPyException(("Cannot output to Paraview non-state " "object.")) filePW = None if masterCore(): if folder: if not path.exists(filename + "/"): mkdir(filename) idxpath = filename.rfind("/") filename += "/" + filename[idxpath + 1 :] if forceNewFile: filePW = fen.File(getNewFilename(filename, "pvd")) else: filePW = fen.File("{}.pvd".format(filename)) - omega = np.abs(omega) t = 0. - dt = 2. * np.pi / omega / periodResolution - if timeFinal is None: timeFinal = 2. * np.pi / omega - dt + dt = 2. * np.pi / np.abs(omega) / periodResolution + if timeFinal is None: + timeFinal = 2. * np.pi / np.abs(omega) - dt if warping is not None: fen.ALE.move(self.V.mesh(), interp_project(warping[0], self.V.mesh())) for j in range(int(np.ceil(timeFinal / dt)) + 1): ut = fen.Function(self.V, name = name) ut.vector().set_local(np.real(u) * np.cos(omega * t) + np.imag(u) * np.sin(omega * t)) filePW << (ut, t) t += dt if warping is not None: fen.ALE.move(self.V.mesh(), interp_project(warping[1], self.V.mesh())) return bcast(filePW) class FenicsEngineBaseTensorized(FenicsEngineBase): """The number of tensorized dimensions should be assigned to nports.""" def plot(self, u:Np1D, warping : List[callable] = None, is_state : bool = False, name : str = "u", save : str = None, what : strLst = 'all', forceNewFile : bool = True, saveFormat : str = "eps", saveDPI : int = 100, show : bool = True, colorMap : str = "jet", fenplotArgs : dict = {}, **figspecs) -> Tuple[FigHandle, str]: """ Do some nice plots of the complex-valued function with given dofs. Args: u: numpy complex array with function dofs. warping(optional): Domain warping functions. is_state(optional): whether given u is value before multiplication by c. Defaults to False. name(optional): Name to be shown as title of the plots. Defaults to 'u'. save(optional): Where to save plot(s). Defaults to None, i.e. no saving. what(optional): Which plots to do. If list, can contain 'ABS', 'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard 'ALL'. Defaults to 'ALL'. forceNewFile(optional): Whether to create new output file. saveFormat(optional): Format for saved plot(s). Defaults to "eps". saveDPI(optional): DPI for saved plot(s). Defaults to 100. show(optional): Whether to show figure. Defaults to True. colorMap(optional): Pyplot colormap. Defaults to 'jet'. fenplotArgs(optional): Optional arguments for fenplot. figspecs(optional key args): Optional arguments for matplotlib figure creation. Returns: Output filename and figure handle. """ nP = checknports(self) if not is_state and not self.isCEye: return super().plot(u.reshape(-1, nP), warping, False, name, save, what, forceNewFile, saveFormat, saveDPI, show, colorMap, fenplotArgs, **figspecs) if isinstance(what, (str,)): if what.upper() == 'ALL': what = ['ABS', 'PHASE', 'REAL', 'IMAG'] else: what = [what] what = purgeList(what, ['ABS', 'PHASE', 'REAL', 'IMAG'], listname = self.name() + ".what", baselevel = 1) if len(what) == 0: return out = None if masterCore(): if 'figsize' not in figspecs.keys(): figspecs['figsize'] = plt.figaspect(1. / len(what)) figspecs['figsize'][1] *= nP fig = plt.figure(**figspecs) plt.set_cmap(colorMap) for i in range(nP): plottingBaseFen(u[i :: nP], fig, self.V, what, nP, i * len(what), warping, "{}_port{}".format(name, i + 1), self.V.mesh().geometric_dimension() > 1, fenplotArgs) plt.tight_layout() if save is not None: save = save.strip() if forceNewFile: fileOut = getNewFilename("{}_fig_".format(save), saveFormat) else: fileOut = "{}_fig.{}".format(save, saveFormat) fig.savefig(fileOut, format = saveFormat, dpi = saveDPI) else: fileOut = None if show: plt.show() out = fig if fileOut is None else (fig, fileOut) return bcast(out) def outParaview(self, u:Np1D, *args, **kwargs) -> List[str]: nP = checknports(self) idx = indicesScatter(nP)[0] filesOut = [] if len(idx) > 0: for j in idx: filesOut += super().outParaview(u[j :: nP], *args, **kwargs) filesOut = listGather(filesOut) if filesOut[0] is None: return None return filesOut def outParaviewTimeDomain(self, u:Np1D, *args, **kwargs) -> List[str]: nP = checknports(self) idx = indicesScatter(nP)[0] filesOut = [] if len(idx) > 0: for j in idx: filesOut += super().outParaviewTimeDomain(u[j :: nP], *args, **kwargs) filesOut = listGather(filesOut) if filesOut[0] is None: return None return filesOut diff --git a/rrompy/hfengines/base/hfengine_base.py b/rrompy/hfengines/base/hfengine_base.py index c875b2a..e444519 100644 --- a/rrompy/hfengines/base/hfengine_base.py +++ b/rrompy/hfengines/base/hfengine_base.py @@ -1,377 +1,386 @@ # Copyright (C) 2018-2020 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from abc import abstractmethod import numpy as np import scipy.sparse as scsp from numbers import Number from collections.abc import Iterable from copy import copy as softcopy from rrompy.utilities.base.decorators import (nonaffine_construct, mu_independent) from rrompy.utilities.base.types import (Np1D, Np2D, Tuple, List, DictAny, paramVal, paramList, sampList) from rrompy.utilities.numerical import solve as tsolve, dot, potential from rrompy.utilities.expression import expressionEvaluator from rrompy.utilities.exception_manager import RROMPyAssert from rrompy.sampling.sample_list import sampleList from rrompy.parameter import (checkParameter, checkParameterList, parameterList, parameterMap as pMap) from rrompy.solver.linear_solver import setupSolver from rrompy.utilities.parallel import (poolRank, masterCore, listScatter, matrixGatherv, isend, recv) __all__ = ['HFEngineBase'] class HFEngineBase: """Generic solver for parametric problems.""" def __init__(self, verbosity : int = 10, timestamp : bool = True): self.verbosity = verbosity self.timestamp = timestamp self.setSolver("SPSOLVE", {"use_umfpack" : False}) self.npar = 0 self._C = None def name(self) -> str: return self.__class__.__name__ def __str__(self) -> str: return self.name() def __repr__(self) -> str: return self.__str__() + " at " + hex(id(self)) def __dir_base__(self): return [x for x in self.__dir__() if x[:2] != "__"] def __deepcopy__(self, memo): return softcopy(self) @property def npar(self): """Value of npar.""" return self._npar @npar.setter def npar(self, npar): nparOld = self._npar if hasattr(self, "_npar") else -1 if npar != nparOld: self.parameterMap = pMap(1., npar) self._npar = npar @property def spacedim(self): return 1 def checkParameter(self, mu:paramVal) -> paramVal: muP = checkParameter(mu, self.npar) if self.npar == 0: muP.reset((1, 0), muP.dtype) return muP def checkParameterList(self, mu:paramList, check_if_single : bool = False) -> paramList: muL = checkParameterList(mu, self.npar, check_if_single) return muL def mapParameterList(self, mu:paramList, direct : str = "F", idx : List[int] = None) -> paramList: if idx is None: idx = np.arange(self.npar) muMapped = checkParameterList(mu, len(idx)) for j, d in enumerate(idx): muMapped.data[:, j] = expressionEvaluator( self.parameterMap[direct][d], muMapped(j)).flatten() return muMapped def buildEnergyNormForm(self): """ Build sparse matrix (in CSR format) representative of scalar product. """ self.energyNormMatrix = 1. def buildEnergyNormDualForm(self): """ Build sparse matrix (in CSR format) representative of dual scalar product without duality. """ self.energyNormDualMatrix = 1. + def buildEnergyNormOutput(self): + """ + Build sparse matrix (in CSR format) representative of scalar product + over output space. + """ + self.energyNormOutputMatrix = 1. + def innerProduct(self, u:Np2D, v:Np2D, onlyDiag : bool = False, dual : bool = False, is_state : bool = True) -> Np2D: """Scalar product.""" if is_state or self.isCEye: if dual: if not hasattr(self, "energyNormDualMatrix"): self.buildEnergyNormDualForm() energyMat = self.energyNormDualMatrix else: if not hasattr(self, "energyNormMatrix"): self.buildEnergyNormForm() energyMat = self.energyNormMatrix else: - energyMat = 1. + if not hasattr(self, "energyNormOutputMatrix"): + self.buildEnergyNormOutput() + energyMat = self.energyNormOutputMatrix if isinstance(u, (parameterList, sampleList)): u = u.data if isinstance(v, (parameterList, sampleList)): v = v.data if onlyDiag: return np.sum(dot(energyMat, u) * v.conj(), axis = 0) return dot(dot(energyMat, u).T, v.conj()).T def norm(self, u:Np2D, dual : bool = False, is_state : bool = True) -> Np1D: return np.abs(self.innerProduct(u, u, onlyDiag = True, dual = dual, is_state = is_state)) ** .5 def baselineA(self): """Return 0 of shape consistent with operator of linear system.""" if (hasattr(self, "As") and isinstance(self.As, Iterable) and self.As[0] is not None): d = self.As[0].shape[0] else: d = self.spacedim return scsp.csr_matrix((np.zeros(0), np.zeros(0), np.zeros(d + 1)), shape = (d, d), dtype = np.complex) def baselineb(self): """Return 0 of shape consistent with RHS of linear system.""" return np.zeros(self.spacedim, dtype = np.complex) @nonaffine_construct @abstractmethod def A(self, mu : paramVal = [], der : List[int] = 0) -> Np2D: """ Assemble terms of operator of linear system and return it (or its derivative) at a given parameter. """ return @nonaffine_construct @abstractmethod def b(self, mu : paramVal = [], der : List[int] = 0) -> Np1D: """ Assemble terms of RHS of linear system and return it (or its derivative) at a given parameter. """ return @mu_independent def C(self, mu:paramVal): """ Value of C. Should be overridden (with something like return self._C(mu) ) if a mu-dependent C is needed. """ if self._C is None: self._C = 1. return self._C @property def isCEye(self): """ Whether the action of C can be seen as a scalar multiplication. Should be overridden (with return True ) if a mu-dependent scalar C is used. """ return isinstance(self._C, Number) def applyC(self, u:sampList, mu:paramVal): """Apply LHS of linear system.""" return dot(self.C(mu), u) def setSolver(self, solverType:str, solverArgs : DictAny = {}): """Choose solver type and parameters.""" self._solver, self._solverArgs = setupSolver(solverType, solverArgs) def solve(self, mu : paramList = [], RHS : sampList = None, return_state : bool = False) -> sampList: """ Find solution of linear system. Args: mu: parameter value. RHS: RHS of linear system. If None, defaults to that of parametric system. Defaults to None. return_state: whether to return state before multiplication by c. Defaults to False. """ from rrompy.sampling import sampleList, emptySampleList if mu == []: mu = self.mu0 mu = self.checkParameterList(mu) if len(mu) == 0: return emptySampleList() mu = self.checkParameterList(mu) mu_loc, idx, sizes = listScatter(mu, return_sizes = True) mu_loc = self.checkParameterList(mu_loc) req, emptyCores = [], np.where(sizes == 0)[0] if len(mu_loc) == 0: uL, uT = recv(source = 0, tag = poolRank()) sol = np.empty((uL, 0), dtype = uT) else: if RHS is None: # build RHSs RHS = sampleList([self.b(m) for m in mu_loc]) else: RHS = sampleList(RHS) if len(RHS) > 1: RHS = sampleList([RHS[i] for i in idx]) mult = 0 if len(RHS) == 1 else 1 RROMPyAssert(mult * (len(mu_loc) - 1) + 1, len(RHS), "Sample size") for j, mj in enumerate(mu_loc): u = tsolve(self.A(mj), RHS[mult * j], self._solver, self._solverArgs) if not return_state: u = self.applyC(u, mj) if j == 0: sol = np.empty((len(u), len(mu_loc)), dtype = u.dtype) if masterCore(): for dest in emptyCores: req += [isend((len(u), u.dtype), dest = dest, tag = dest)] sol[:, j] = u for r in req: r.wait() sol = matrixGatherv(sol, sizes) return sampleList(sol) def residual(self, mu : paramList = [], u : sampList = None, post_c : bool = True) -> sampList: """ Find residual of linear system for given approximate solution. Args: mu: parameter value. u: numpy complex array with function dofs. If None, set to 0. post_c: whether to post-process using c. Defaults to True. """ from rrompy.sampling import sampleList, emptySampleList if mu == []: mu = self.mu0 mu = self.checkParameterList(mu) if len(mu) == 0: return emptySampleList() mu_loc, idx, sizes = listScatter(mu, return_sizes = True) mu_loc = self.checkParameterList(mu_loc) req, emptyCores = [], np.where(sizes == 0)[0] if len(mu_loc) == 0: uL, uT = recv(source = 0, tag = poolRank()) res = np.empty((uL, 0), dtype = uT) else: v = sampleList(np.zeros((self.spacedim, len(mu_loc)))) if u is not None: u = sampleList(u) v = v + sampleList([u[i] for i in idx]) for j, (mj, vj) in enumerate(zip(mu_loc, v)): r = self.b(mj) - dot(self.A(mj), vj) if post_c: r = self.applyC(r, mj) if j == 0: res = np.empty((len(r), len(mu_loc)), dtype = r.dtype) if masterCore(): for dest in emptyCores: req += [isend((len(r), r.dtype), dest = dest, tag = dest)] res[:, j] = r for r in req: r.wait() res = matrixGatherv(res, sizes) return sampleList(res) cutOffPolesRMax,cutOffPolesRMin = np.inf, - np.inf cutOffPolesIMax, cutOffPolesIMin = np.inf, - np.inf def flagBadPolesResiduesAbsolute(self, poles:Np1D, residues : Np1D = None, projMat : Np2D = None) -> Np1D: """ Flag (numerical) poles/residues which are impossible. Args: poles: poles to be judged. residues: residues norms to be judged. projMat: matrix for projection of residues. """ poles = np.array(poles).flatten() flag = np.zeros(len(poles), dtype = bool) RMax, RMin = self.cutOffPolesRMax, self.cutOffPolesRMin IMax, IMin = self.cutOffPolesIMax, self.cutOffPolesIMin if not np.isinf(RMax): flag = flag + (np.real(poles) > RMax) if not np.isinf(RMin): flag = flag + (np.real(poles) < RMin) if not np.isinf(IMax): flag = flag + (np.imag(poles) > IMax) if not np.isinf(IMin): flag = flag + (np.imag(poles) < IMin) return flag cutOffPolesPotentialMax = np.inf cutOffPolesRMaxRel, cutOffPolesRMinRel = np.inf, - np.inf cutOffPolesIMaxRel, cutOffPolesIMinRel = np.inf, - np.inf cutOffResNormMin = -1 cutOffResAngleMin, cutOffResAngleMax = -1, np.pi + 1 def flagBadPolesResiduesRelative(self, poles:Np1D, residues : Np1D = None, projMat : Np2D = None, foci : Tuple[float, float] = [-1., 1.]) \ -> Np1D: """ Flag (numerical) poles/residues which are impossible. Args: poles: poles to be judged. residues: residues norms to be judged. projMat: matrix for projection of residues. foci: foci for potential evaluation. """ poles = np.array(poles).flatten() flag = np.zeros(len(poles), dtype = bool) potMax = self.cutOffPolesPotentialMax RMax, RMin = self.cutOffPolesRMaxRel, self.cutOffPolesRMinRel IMax, IMin = self.cutOffPolesIMaxRel, self.cutOffPolesIMinRel if not np.isinf(potMax) or (residues is not None and not self._ignoreResidues): plsInf = np.isinf(poles) pot = potential(poles, foci) if not np.isinf(potMax): flag = flag + (pot > potMax) if not np.isinf(RMax): flag = flag + (np.real(poles) > RMax) if not np.isinf(RMin): flag = flag + (np.real(poles) < RMin) if not np.isinf(IMax): flag = flag + (np.imag(poles) > IMax) if not np.isinf(IMin): flag = flag + (np.imag(poles) < IMin) if residues is not None and not self._ignoreResidues: residues = np.array(residues).reshape(-1, len(poles)) resGood = np.where(flag + plsInf == False)[0] if len(resGood) > 0: residues = residues[:, resGood] / pot[resGood] if projMat is None: resNorm = np.linalg.norm(residues, axis = 0) else: residues = projMat.dot(residues) resNorm = self.norm(residues) if self.cutOffResNormMin > 0.: flag[resGood[resNorm < self.cutOffResNormMin * np.max(resNorm)]] = 1 resGood = np.where(flag + plsInf == False)[0] if len(resGood) > 0 and (self.cutOffResAngleMin > 0. or self.cutOffResAngleMax < np.pi): if projMat is None: angles = np.real(residues.T.conj().dot(residues)) else: angles = np.real(self.innerProduct(residues, residues)) resNormEff = resNorm resNormEff[np.isclose(resNormEff, 0., atol = 1e-15)] = 1. angles = np.clip((angles / resNormEff).T / resNormEff, -1., 1.) angles = np.arccos(angles) badangles = ((angles < self.cutOffResAngleMin) + (angles > self.cutOffResAngleMax)) badangles[np.arange(len(angles)), np.arange(len(angles))] = 0 idx = np.zeros(len(angles), dtype = bool) while np.sum(badangles) > 0: idxn = np.argmax(np.sum(badangles, axis = 1)) badangles[idxn], badangles[:, idxn] = 0, 0 idx[idxn] = True flag[resGood[idx]] = 1 return flag > 0 @property def _ignoreResidues(self): return (self.cutOffResNormMin <= 0. and self.cutOffResAngleMin <= 0. and self.cutOffResAngleMax >= np.pi) diff --git a/rrompy/hfengines/fenics_engines/helmholtz_problem_engine.py b/rrompy/hfengines/fenics_engines/helmholtz_problem_engine.py index ff062b0..0163350 100644 --- a/rrompy/hfengines/fenics_engines/helmholtz_problem_engine.py +++ b/rrompy/hfengines/fenics_engines/helmholtz_problem_engine.py @@ -1,230 +1,232 @@ # Copyright (C) 2018-2020 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import fenics as fen from .laplace_base_problem_engine import LaplaceBaseProblemEngine from rrompy.solver.fenics import fenZERO, fenONE, fenics2Sparse from rrompy.utilities.base import verbosityManager as vbMng from rrompy.utilities.exception_manager import RROMPyWarning from rrompy.parameter import parameterMap as pMap __all__ = ['HelmholtzProblemEngine', 'ScatteringProblemEngine'] class HelmholtzProblemEngine(LaplaceBaseProblemEngine): """ Solver for generic Helmholtz problems with parametric wavenumber. - \nabla \cdot (a \nabla u) - omega^2 * n**2 * u = f in \Omega u = u0 on \Gamma_D \partial_nu = g1 on \Gamma_N \partial_nu + h u = g2 on \Gamma_R Attributes: verbosity: Verbosity level. BCManager: Boundary condition manager. V: Real FE space. u: Generic trial functions for variational form evaluation. v: Generic test functions for variational form evaluation. As: Scipy sparse array representation (in CSC format) of As. bs: Numpy array representation of bs. cs: Numpy array representation of cs. energyNormMatrix: Scipy sparse matrix representing inner product over V. energyNormDualMatrix: Scipy sparse matrix representing dual inner product between Riesz representers V-V. degree_threshold: Threshold for ufl expression interpolation degree. omega: Value of omega. diffusivity: Value of a. refractionIndex: Value of n. forcingTerm: Value of f. DirichletDatum: Value of u0. NeumannDatum: Value of g1. RobinDatumG: Value of g2. RobinDatumH: Value of h. DirichletBoundary: Function handle to \Gamma_D. NeumannBoundary: Function handle to \Gamma_N. RobinBoundary: Function handle to \Gamma_R. ds: Boundary measure 2-tuple (resp. for Neumann and Robin boundaries). dsToBeSet: Whether ds needs to be set. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._affinePoly = True self.nAs = 2 self.parameterMap = pMap([2.] + [1.] * (self.npar - 1)) self.refractionIndex = fenONE @property def refractionIndex(self): """Value of n.""" return self._refractionIndex @refractionIndex.setter def refractionIndex(self, refractionIndex): self.resetAs() if not isinstance(refractionIndex, (list, tuple,)): refractionIndex = [refractionIndex, fenZERO] self._refractionIndex = refractionIndex def buildA(self): """Build terms of operator of linear system.""" if self.thAs[0] is None: self.thAs = self.getMonomialWeights(self.nAs) if self.As[0] is None: self.autoSetDS() vbMng(self, "INIT", "Assembling operator term A0.", 20) DirichletBC0 = fen.DirichletBC(self.V, fenZERO, self.DirichletBoundary) aRe, aIm = self.diffusivity hRe, hIm = self.RobinDatumH termNames = ["diffusivity", "RobinDatumH"] parsRe = self.iterReduceQuadratureDegree(zip( [aRe, hRe], [x + "Real" for x in termNames])) parsIm = self.iterReduceQuadratureDegree(zip( [aIm, hIm], [x + "Imag" for x in termNames])) - a0Re = (aRe * fen.dot(fen.grad(self.u), fen.grad(self.v)) * fen.dx - + hRe * fen.dot(self.u, self.v) * self.ds(1)) - a0Im = (aIm * fen.dot(fen.grad(self.u), fen.grad(self.v)) * fen.dx - + hIm * fen.dot(self.u, self.v) * self.ds(1)) + a0Re = (aRe * fen.inner(fen.grad(self.u), + fen.grad(self.v)) * fen.dx + + hRe * self.u * self.v * self.ds(1)) + a0Im = (aIm * fen.inner(fen.grad(self.u), + fen.grad(self.v)) * fen.dx + + hIm * self.u * self.v * self.ds(1)) self.As[0] = (fenics2Sparse(a0Re, parsRe, DirichletBC0, 1) + 1.j * fenics2Sparse(a0Im, parsIm, DirichletBC0, 0)) vbMng(self, "DEL", "Done assembling operator term.", 20) if self.As[1] is None: vbMng(self, "INIT", "Assembling operator term A1.", 20) DirichletBC0 = fen.DirichletBC(self.V, fenZERO, self.DirichletBoundary) nRe, nIm = self.refractionIndex n2Re, n2Im = nRe * nRe - nIm * nIm, 2 * nRe * nIm parsRe = self.iterReduceQuadratureDegree(zip([n2Re], ["refractionIndexSquaredReal"])) parsIm = self.iterReduceQuadratureDegree(zip([n2Im], ["refractionIndexSquaredImag"])) - a1Re = - n2Re * fen.dot(self.u, self.v) * fen.dx - a1Im = - n2Im * fen.dot(self.u, self.v) * fen.dx + a1Re = - n2Re * self.u * self.v * fen.dx + a1Im = - n2Im * self.u * self.v * fen.dx self.As[1] = (fenics2Sparse(a1Re, parsRe, DirichletBC0, 0) + 1.j * fenics2Sparse(a1Im, parsIm, DirichletBC0, 0)) vbMng(self, "DEL", "Done assembling operator term.", 20) class ScatteringProblemEngine(HelmholtzProblemEngine): """ Solver for scattering problems with parametric wavenumber. - \nabla \cdot (a \nabla u) - omega^2 * n**2 * u = f in \Omega u = u0 on \Gamma_D \partial_nu = g1 on \Gamma_N \partial_nu +- i omega u = g2 on \Gamma_R Attributes: verbosity: Verbosity level. BCManager: Boundary condition manager. V: Real FE space. u: Generic trial functions for variational form evaluation. v: Generic test functions for variational form evaluation. As: Scipy sparse array representation (in CSC format) of As. bs: Numpy array representation of bs. cs: Numpy array representation of cs. energyNormMatrix: Scipy sparse matrix representing inner product over V. energyNormDualMatrix: Scipy sparse matrix representing dual inner product between Riesz representers V-V. degree_threshold: Threshold for ufl expression interpolation degree. signR: Sign in ABC. omega: Value of omega. diffusivity: Value of a. forcingTerm: Value of f. DirichletDatum: Value of u0. NeumannDatum: Value of g1. RobinDatumG: Value of g2. DirichletBoundary: Function handle to \Gamma_D. NeumannBoundary: Function handle to \Gamma_N. RobinBoundary: Function handle to \Gamma_R. ds: Boundary measure 2-tuple (resp. for Neumann and Robin boundaries). dsToBeSet: Whether ds needs to be set. """ def __init__(self, *args, **kwargs): self.silenceWarnings = True super().__init__(*args, **kwargs) self._affinePoly = True del self.silenceWarnings self.nAs = 3 self.parameterMap = pMap(1., self.npar) self.signR = - 1. @property def RobinDatumH(self): """Value of h.""" if not hasattr(self, "silenceWarnings"): RROMPyWarning("Scattering problems do not allow changes of h.") return self.signR @RobinDatumH.setter def RobinDatumH(self, RobinDatumH): if not hasattr(self, "silenceWarnings"): RROMPyWarning(("Scattering problems do not allow changes of h. " "Ignoring assignment.")) return @property def signR(self): """Value of signR.""" return self._signR @signR.setter def signR(self, signR): self.resetAs() self._signR = signR def buildA(self): """Build terms of operator of linear system.""" if self.thAs[0] is None: self.thAs = self.getMonomialWeights(self.nAs) if self.As[0] is None: vbMng(self, "INIT", "Assembling operator term A0.", 20) DirichletBC0 = fen.DirichletBC(self.V, fenZERO, self.DirichletBoundary) aRe, aIm = self.diffusivity parsRe = self.iterReduceQuadratureDegree(zip([aRe], ["diffusivityReal"])) parsIm = self.iterReduceQuadratureDegree(zip([aIm], ["diffusivityImag"])) - a0Re = aRe * fen.dot(fen.grad(self.u), fen.grad(self.v)) * fen.dx - a0Im = aIm * fen.dot(fen.grad(self.u), fen.grad(self.v)) * fen.dx + a0Re = aRe * fen.inner(fen.grad(self.u), fen.grad(self.v)) * fen.dx + a0Im = aIm * fen.inner(fen.grad(self.u), fen.grad(self.v)) * fen.dx self.As[0] = (fenics2Sparse(a0Re, parsRe, DirichletBC0, 1) + 1.j * fenics2Sparse(a0Im, parsIm, DirichletBC0, 0)) vbMng(self, "DEL", "Done assembling operator term.", 20) if self.As[1] is None: self.autoSetDS() vbMng(self, "INIT", "Assembling operator term A1.", 20) DirichletBC0 = fen.DirichletBC(self.V, fenZERO, self.DirichletBoundary) - a1 = fen.dot(self.u, self.v) * self.ds(1) + a1 = self.u * self.v * self.ds(1) self.As[1] = (self.signR * 1.j * fenics2Sparse(a1, {}, DirichletBC0, 0)) vbMng(self, "DEL", "Done assembling operator term.", 20) if self.As[2] is None: vbMng(self, "INIT", "Assembling operator term A2.", 20) DirichletBC0 = fen.DirichletBC(self.V, fenZERO, self.DirichletBoundary) nRe, nIm = self.refractionIndex n2Re, n2Im = nRe * nRe - nIm * nIm, 2 * nRe * nIm parsRe = self.iterReduceQuadratureDegree(zip([n2Re], ["refractionIndexSquaredReal"])) parsIm = self.iterReduceQuadratureDegree(zip([n2Im], ["refractionIndexSquaredImag"])) - a2Re = - n2Re * fen.dot(self.u, self.v) * fen.dx - a2Im = - n2Im * fen.dot(self.u, self.v) * fen.dx + a2Re = - n2Re * self.u * self.v * fen.dx + a2Im = - n2Im * self.u * self.v * fen.dx self.As[2] = (fenics2Sparse(a2Re, parsRe, DirichletBC0, 0) + 1.j * fenics2Sparse(a2Im, parsIm, DirichletBC0, 0)) vbMng(self, "DEL", "Done assembling operator term.", 20) diff --git a/rrompy/hfengines/fenics_engines/laplace_base_problem_engine.py b/rrompy/hfengines/fenics_engines/laplace_base_problem_engine.py index 4ebd0a1..472005d 100644 --- a/rrompy/hfengines/fenics_engines/laplace_base_problem_engine.py +++ b/rrompy/hfengines/fenics_engines/laplace_base_problem_engine.py @@ -1,252 +1,252 @@ # Copyright (C) 2018-2020 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import numpy as np import fenics as fen from rrompy.hfengines.base.linear_affine_engine import LinearAffineEngine from rrompy.hfengines.base.fenics_engine_base import FenicsEngineBase from rrompy.utilities.base.types import paramVal from rrompy.solver.fenics import (fenZERO, fenONE, H1NormMatrix, Hminus1NormMatrix) from rrompy.utilities.base import verbosityManager as vbMng from rrompy.parameter import checkParameter from rrompy.solver.fenics import fenics2Sparse, fenics2Vector __all__ = ['LaplaceBaseProblemEngine'] class LaplaceBaseProblemEngine(LinearAffineEngine, FenicsEngineBase): """ Solver for generic Laplace problems. - \nabla \cdot (a \nabla u) = f in \Omega u = u0 on \Gamma_D \partial_nu = g1 on \Gamma_N \partial_nu + h u = g2 on \Gamma_R Attributes: verbosity: Verbosity level. BCManager: Boundary condition manager. V: Real FE space. u: Generic trial functions for variational form evaluation. v: Generic test functions for variational form evaluation. As: Scipy sparse array representation (in CSC format) of As. bs: Numpy array representation of bs. cs: Numpy array representation of cs. energyNormMatrix: Scipy sparse matrix representing inner product over V. energyNormDualMatrix: Scipy sparse matrix representing dual inner product between Riesz representers V-V. degree_threshold: Threshold for ufl expression interpolation degree. omega: Value of omega. diffusivity: Value of a. forcingTerm: Value of f. DirichletDatum: Value of u0. NeumannDatum: Value of g1. RobinDatumG: Value of g2. RobinDatumH: Value of h. DirichletBoundary: Function handle to \Gamma_D. NeumannBoundary: Function handle to \Gamma_N. RobinBoundary: Function handle to \Gamma_R. ds: Boundary measure 2-tuple (resp. for Neumann and Robin boundaries). dsToBeSet: Whether ds needs to be set. """ _energyDualNormCompress = None def __init__(self, mu0 : paramVal = [], degree_threshold : int = np.inf, verbosity : int = 10, timestamp : bool = True): super().__init__(degree_threshold = degree_threshold, verbosity = verbosity, timestamp = timestamp) self._affinePoly = True self.mu0 = checkParameter(mu0) self.npar = self.mu0.shape[1] self.omega = np.abs(self.mu0(0, 0)) if self.npar > 0 else 0. self.diffusivity = fenONE self.forcingTerm = fenZERO self.DirichletDatum = fenZERO self.NeumannDatum = fenZERO self.RobinDatumG = fenZERO self.RobinDatumH = fenZERO @property def diffusivity(self): """Value of a.""" return self._diffusivity @diffusivity.setter def diffusivity(self, diffusivity): self.resetAs() if not isinstance(diffusivity, (list, tuple,)): diffusivity = [diffusivity, fenZERO] self._diffusivity = diffusivity @property def forcingTerm(self): """Value of f.""" return self._forcingTerm @forcingTerm.setter def forcingTerm(self, forcingTerm): self.resetbs() if not isinstance(forcingTerm, (list, tuple,)): forcingTerm = [forcingTerm, fenZERO] self._forcingTerm = forcingTerm @property def DirichletDatum(self): """Value of u0.""" return self._DirichletDatum @DirichletDatum.setter def DirichletDatum(self, DirichletDatum): self.resetbs() if not isinstance(DirichletDatum, (list, tuple,)): DirichletDatum = [DirichletDatum, fenZERO] self._DirichletDatum = DirichletDatum @property def NeumannDatum(self): """Value of g1.""" return self._NeumannDatum @NeumannDatum.setter def NeumannDatum(self, NeumannDatum): self.resetbs() if not isinstance(NeumannDatum, (list, tuple,)): NeumannDatum = [NeumannDatum, fenZERO] self._NeumannDatum = NeumannDatum @property def RobinDatumG(self): """Value of g2.""" return self._RobinDatumG @RobinDatumG.setter def RobinDatumG(self, RobinDatumG): self.resetbs() if not isinstance(RobinDatumG, (list, tuple,)): RobinDatumG = [RobinDatumG, fenZERO] self._RobinDatumG = RobinDatumG @property def RobinDatumH(self): """Value of h.""" return self._RobinDatumH @RobinDatumH.setter def RobinDatumH(self, RobinDatumH): self.resetAs() if not isinstance(RobinDatumH, (list, tuple,)): RobinDatumH = [RobinDatumH, fenZERO] self._RobinDatumH = RobinDatumH @property def DirichletBoundary(self): """Function handle to DirichletBoundary.""" return self.BCManager.DirichletBoundary @DirichletBoundary.setter def DirichletBoundary(self, DirichletBoundary): self.resetAs() self.resetbs() self.BCManager.DirichletBoundary = DirichletBoundary @property def NeumannBoundary(self): """Function handle to NeumannBoundary.""" return self.BCManager.NeumannBoundary @NeumannBoundary.setter def NeumannBoundary(self, NeumannBoundary): self.resetAs() self.resetbs() self.dsToBeSet = True self.BCManager.NeumannBoundary = NeumannBoundary @property def RobinBoundary(self): """Function handle to RobinBoundary.""" return self.BCManager.RobinBoundary @RobinBoundary.setter def RobinBoundary(self, RobinBoundary): self.resetAs() self.resetbs() self.dsToBeSet = True self.BCManager.RobinBoundary = RobinBoundary def buildEnergyNormForm(self): """ Build sparse matrix (in CSR format) representative of scalar product. """ vbMng(self, "INIT", "Assembling energy matrix.", 20) self.energyNormMatrix = H1NormMatrix(self.V, np.abs(self.omega)**2) vbMng(self, "DEL", "Done assembling energy matrix.", 20) def buildEnergyNormDualForm(self): """ Build sparse matrix (in CSR format) representative of dual scalar product without duality. """ vbMng(self, "INIT", "Assembling energy dual matrix.", 20) self.energyNormDualMatrix = Hminus1NormMatrix( self.V, np.abs(self.omega)**2, compressRank = self._energyDualNormCompress) vbMng(self, "DEL", "Done assembling energy dual matrix.", 20) def buildA(self): """Build terms of operator of linear system.""" if self.thAs[0] is None: self.thAs = self.getMonomialWeights(self.nAs) if self.As[0] is None: self.autoSetDS() vbMng(self, "INIT", "Assembling operator term A0.", 20) DirichletBC0 = fen.DirichletBC(self.V, fenZERO, self.DirichletBoundary) aRe, aIm = self.diffusivity hRe, hIm = self.RobinDatumH termNames = ["diffusivity", "RobinDatumH"] parsRe = self.iterReduceQuadratureDegree(zip([aRe, hRe], [x + "Real" for x in termNames])) parsIm = self.iterReduceQuadratureDegree(zip([aIm, hIm], [x + "Imag" for x in termNames])) - a0Re = (aRe * fen.dot(fen.grad(self.u), fen.grad(self.v)) * fen.dx - + hRe * fen.dot(self.u, self.v) * self.ds(1)) - a0Im = (aIm * fen.dot(fen.grad(self.u), fen.grad(self.v)) * fen.dx - + hIm * fen.dot(self.u, self.v) * self.ds(1)) + a0Re = (aRe * fen.inner(fen.grad(self.u), + fen.grad(self.v)) * fen.dx + + hRe * self.u * self.v * self.ds(1)) + a0Im = (aIm * fen.inner(fen.grad(self.u), + fen.grad(self.v)) * fen.dx + + hIm * self.u * self.v * self.ds(1)) self.As[0] = (fenics2Sparse(a0Re, parsRe, DirichletBC0, 1) + 1.j * fenics2Sparse(a0Im, parsIm, DirichletBC0, 0)) vbMng(self, "DEL", "Done assembling operator term.", 20) def buildb(self): """Build terms of operator of linear system.""" if self.thbs[0] is None: self.thbs = self.getMonomialWeights(self.nbs) if self.bs[0] is None: self.autoSetDS() vbMng(self, "INIT", "Assembling forcing term b0.", 20) u0Re, u0Im = self.DirichletDatum fRe, fIm = self.forcingTerm g1Re, g1Im = self.NeumannDatum g2Re, g2Im = self.RobinDatumG termNames = ["forcingTerm", "NeumannDatum", "RobinDatumG"] parsRe = self.iterReduceQuadratureDegree(zip([fRe, g1Re, g2Re], [x + "Real" for x in termNames])) parsIm = self.iterReduceQuadratureDegree(zip([fIm, g1Im, g2Im], [x + "Imag" for x in termNames])) - L0Re = (fen.dot(fRe, self.v) * fen.dx - + fen.dot(g1Re, self.v) * self.ds(0) - + fen.dot(g2Re, self.v) * self.ds(1)) - L0Im = (fen.dot(fIm, self.v) * fen.dx - + fen.dot(g1Im, self.v) * self.ds(0) - + fen.dot(g2Im, self.v) * self.ds(1)) + L0Re = (fRe * self.v * fen.dx + g1Re * self.v * self.ds(0) + + g2Re * self.v * self.ds(1)) + L0Im = (fIm * self.v * fen.dx + g1Im * self.v * self.ds(0) + + g2Im * self.v * self.ds(1)) DBCR = fen.DirichletBC(self.V, u0Re, self.DirichletBoundary) DBCI = fen.DirichletBC(self.V, u0Im, self.DirichletBoundary) self.bs[0] = (fenics2Vector(L0Re, parsRe, DBCR, 1) + 1.j * fenics2Vector(L0Im, parsIm, DBCI, 1)) vbMng(self, "DEL", "Done assembling forcing term.", 20) diff --git a/rrompy/reduction_methods/base/generic_approximant.py b/rrompy/reduction_methods/base/generic_approximant.py index e3105bb..db43c3d 100644 --- a/rrompy/reduction_methods/base/generic_approximant.py +++ b/rrompy/reduction_methods/base/generic_approximant.py @@ -1,871 +1,872 @@ # Copyright (C) 2018-2020 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from abc import abstractmethod import numpy as np from collections.abc import Iterable from itertools import product as iterprod from copy import deepcopy as copy from os import remove as osrm from rrompy.sampling import (SamplingEngine, SamplingEngineNormalize, SamplingEnginePOD) from rrompy.utilities.base.types import (Np1D, DictAny, HFEng, List, Tuple, ListAny, strLst, paramVal, paramList, sampList) from rrompy.utilities.base.data_structures import purgeDict, getNewFilename from rrompy.utilities.base import verbosityManager as vbMng from rrompy.utilities.exception_manager import (RROMPyException, RROMPyAssert, RROMPy_READY, RROMPy_FRAGILE) from rrompy.utilities.base.pickle_utilities import pickleDump, pickleLoad from rrompy.parameter import (emptyParameterList, checkParameter, checkParameterList) from rrompy.sampling import sampleList, emptySampleList from rrompy.utilities.parallel import (bcast, masterCore, listGather, listScatter) __all__ = ['GenericApproximant'] def addNormFieldToClass(self, fieldName): def objFunc(self, mu:paramList, *args, **kwargs) -> Np1D: uV = getattr(self.__class__, "get" + fieldName)(self, mu) kwargs["is_state"] = False val = self.HFEngine.norm(uV, *args, **kwargs) return val setattr(self.__class__, "norm" + fieldName, objFunc) def addNormDualFieldToClass(self, fieldName): def objFunc(self, mu:paramList, *args, **kwargs) -> Np1D: uV = getattr(self.__class__, "get" + fieldName)(self, mu) kwargs["is_state"] = True if "dual" not in kwargs.keys(): kwargs["dual"] = True val = self.HFEngine.norm(uV, *args, **kwargs) return val setattr(self.__class__, "norm" + fieldName, objFunc) def addPlotFieldToClass(self, fieldName): def objFunc(self, mu:paramList, *args, **kwargs): uV = getattr(self.__class__, "get" + fieldName)(self, mu) uV = listScatter(uV)[0].T filesOut = [] if len(uV) > 0: if "name" in kwargs.keys(): nameBase = copy(kwargs["name"]) for j, u in enumerate(uV): if "name" in kwargs.keys(): kwargs["name"] = nameBase + str(j) filesOut += [self.HFEngine.plot(u, *args, **kwargs)] if "name" in kwargs.keys(): kwargs["name"] = nameBase filesOut = listGather(filesOut) if filesOut[0] is None: return None return filesOut setattr(self.__class__, "plot" + fieldName, objFunc) def addPlotDualFieldToClass(self, fieldName): def objFunc(self, mu:paramList, *args, **kwargs): uV = getattr(self.__class__, "get" + fieldName)(self, mu) uV = listScatter(uV)[0].T filesOut = [] if len(uV) > 0: if "name" in kwargs.keys(): nameBase = copy(kwargs["name"]) for j, u in enumerate(uV): if "name" in kwargs.keys(): kwargs["name"] = nameBase + str(j) filesOut += [self.HFEngine.plot(u, *args, **kwargs)] if "name" in kwargs.keys(): kwargs["name"] = nameBase filesOut = listGather(filesOut) if filesOut[0] is None: return None return filesOut setattr(self.__class__, "plot" + fieldName, objFunc) def addOutParaviewFieldToClass(self, fieldName): def objFunc(self, mu:paramVal, *args, **kwargs): if not hasattr(self.HFEngine, "outParaview"): raise RROMPyException(("High fidelity engine cannot output to " "Paraview.")) uV = getattr(self.__class__, "get" + fieldName)(self, mu) uV = listScatter(uV)[0].T filesOut = [] if len(uV) > 0: if "name" in kwargs.keys(): nameBase = copy(kwargs["name"]) for j, u in enumerate(uV): if "name" in kwargs.keys(): kwargs["name"] = nameBase + str(j) filesOut += [self.HFEngine.outParaview(u, *args, **kwargs)] if "name" in kwargs.keys(): kwargs["name"] = nameBase filesOut = listGather(filesOut) if filesOut[0] is None: return None return filesOut setattr(self.__class__, "outParaview" + fieldName, objFunc) def addOutParaviewTimeDomainFieldToClass(self, fieldName): def objFunc(self, mu:paramVal, *args, **kwargs): if not hasattr(self.HFEngine, "outParaviewTimeDomain"): raise RROMPyException(("High fidelity engine cannot output to " "Paraview.")) uV = getattr(self.__class__, "get" + fieldName)(self, mu) uV = listScatter(uV)[0].T filesOut = [] if len(uV) > 0: omega = args.pop(0) if len(args) > 0 else np.real(mu) if "name" in kwargs.keys(): nameBase = copy(kwargs["name"]) filesOut = [] for j, u in enumerate(uV): if "name" in kwargs.keys(): kwargs["name"] = nameBase + str(j) filesOut += [self.HFEngine.outParaviewTimeDomain(u, omega, *args, **kwargs)] if "name" in kwargs.keys(): kwargs["name"] = nameBase filesOut = listGather(filesOut) if filesOut[0] is None: return None return filesOut setattr(self.__class__, "outParaviewTimeDomain" + fieldName, objFunc) class GenericApproximant: """ ABSTRACT ROM approximant computation for parametric problems. Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': kind of snapshots orthogonalization; allowed values include 0, 1/2, and 1; defaults to 1, i.e. full POD; - 'scaleFactorDer': scaling factors for derivative computation; defaults to 'AUTO'; - 'S': total number of samples current approximant relies upon. Defaults to empty dict. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. trainedModel: Trained model evaluator. mu0: Default parameter. approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList{Soft,Critical}. parameterListSoft: Recognized keys of soft approximant parameters: - 'POD': kind of snapshots orthogonalization; - 'scaleFactorDer': scaling factors for derivative computation. parameterListCritical: Recognized keys of critical approximant parameters: - 'S': total number of samples current approximant relies upon. verbosity: Verbosity level. POD: Kind of snapshots orthogonalization. scaleFactorDer: Scaling factors for derivative computation. S: Number of solution snapshots over which current approximant is based upon. samplingEngine: Sampling engine. uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as sampleList. lastSolvedHF: Parameter(s) corresponding to last computed high fidelity solution(s) as parameterList. uApproxReduced: Reduced approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApproxReduced: Parameter(s) corresponding to last computed reduced approximate solution(s) as parameterList. uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApprox: Parameter(s) corresponding to last computed approximate solution(s) as parameterList. """ __all__ += [ftype + dtype for ftype, dtype in iterprod( ["norm", "plot", "outParaview", "outParaviewTimeDomain"], ["HF", "RHS", "Approx", "Res", "Err"])] def __init__(self, HFEngine:HFEng, mu0 : paramVal = None, approxParameters : DictAny = {}, verbosity : int = 10, timestamp : bool = True): self._preInit() self._mode = RROMPy_READY self.verbosity = verbosity self.timestamp = timestamp if not hasattr(self, "_output_lvl"): self._output_lvl = [] self._output_lvl += [1] vbMng(self, "INIT", "Initializing engine of type {}.".format(self.name()), 10) self._HFEngine = HFEngine self.trainedModel = None self.lastSolvedHF = emptyParameterList() self.uHF = emptySampleList() self._addParametersToList(["POD", "scaleFactorDer"], [1, "AUTO"], ["S"], [1.]) if mu0 is None: if hasattr(self.HFEngine, "mu0"): self.mu0 = checkParameter(self.HFEngine.mu0) else: raise RROMPyException(("Center of approximation cannot be " "inferred from HF engine. Parameter " "required")) else: self.mu0 = checkParameter(mu0, self.HFEngine.npar) self.resetSamples() self.approxParameters = approxParameters self._postInit() ### add norm{HF,Approx,Err} methods """ Compute norm of * at arbitrary parameter. Args: mu: Target parameter. Returns: Target norm of *. """ for objName in ["HF", "Approx", "Err"]: addNormFieldToClass(self, objName) ### add norm{RHS,Res} methods """ Compute norm of * at arbitrary parameter. Args: mu: Target parameter. Returns: Target norm of *. """ for objName in ["RHS", "Res"]: addNormDualFieldToClass(self, objName) ### add plot{HF,Approx,Err} methods """ Do some nice plots of * at arbitrary parameter. Args: mu: Target parameter. name(optional): Name to be shown as title of the plots. Defaults to 'u'. what(optional): Which plots to do. If list, can contain 'ABS', 'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard 'ALL'. Defaults to 'ALL'. save(optional): Where to save plot(s). Defaults to None, i.e. no saving. saveFormat(optional): Format for saved plot(s). Defaults to "eps". saveDPI(optional): DPI for saved plot(s). Defaults to 100. show(optional): Whether to show figure. Defaults to True. figspecs(optional key args): Optional arguments for matplotlib figure creation. """ for objName in ["HF", "Approx", "Err"]: addPlotFieldToClass(self, objName) ### add plot{RHS,Res} methods """ Do some nice plots of * at arbitrary parameter. Args: mu: Target parameter. name(optional): Name to be shown as title of the plots. Defaults to 'u'. what(optional): Which plots to do. If list, can contain 'ABS', 'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard 'ALL'. Defaults to 'ALL'. save(optional): Where to save plot(s). Defaults to None, i.e. no saving. saveFormat(optional): Format for saved plot(s). Defaults to "eps". saveDPI(optional): DPI for saved plot(s). Defaults to 100. show(optional): Whether to show figure. Defaults to True. figspecs(optional key args): Optional arguments for matplotlib figure creation. """ for objName in ["RHS", "Res"]: addPlotDualFieldToClass(self, objName) ### add outParaview{HF,RHS,Approx,Res,Err} methods """ Output * to ParaView file. Args: mu: Target parameter. name(optional): Base name to be used for data output. filename(optional): Name of output file. time(optional): Timestamp. what(optional): Which plots to do. If list, can contain 'MESH', 'ABS', 'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard 'ALL'. Defaults to 'ALL'. forceNewFile(optional): Whether to create new output file. filePW(optional): Fenics File entity (for time series). """ for objName in ["HF", "RHS", "Approx", "Res", "Err"]: addOutParaviewFieldToClass(self, objName) ### add outParaviewTimeDomain{HF,RHS,Approx,Res,Err} methods """ Output * to ParaView file, converted to time domain. Args: mu: Target parameter. omega(optional): frequency. timeFinal(optional): final time of simulation. periodResolution(optional): number of time steps per period. name(optional): Base name to be used for data output. filename(optional): Name of output file. forceNewFile(optional): Whether to create new output file. """ for objName in ["HF", "RHS", "Approx", "Res", "Err"]: addOutParaviewTimeDomainFieldToClass(self, objName) def _preInit(self): if not hasattr(self, "depth"): self.depth = 0 else: self.depth += 1 @property def tModelType(self): raise RROMPyException("No trainedModel type assigned.") def initializeModelData(self, datadict): from .trained_model.trained_model_data import TrainedModelData data = TrainedModelData(datadict["mu0"], datadict["mus"], datadict.pop("projMat"), datadict["scaleFactor"], datadict.pop("parameterMap")) return (data, ["mu0", "scaleFactor", "mus"]) @property def parameterList(self): """Value of parameterListSoft + parameterListCritical.""" return self.parameterListSoft + self.parameterListCritical def _addParametersToList(self, whatSoft : strLst = [], defaultSoft : ListAny = [], whatCritical : strLst = [], defaultCritical : ListAny = [], toBeExcluded : strLst = []): if not hasattr(self, "parameterToBeExcluded"): self.parameterToBeExcluded = [] self.parameterToBeExcluded = toBeExcluded + self.parameterToBeExcluded if not hasattr(self, "parameterListSoft"): self.parameterListSoft = [] if not hasattr(self, "parameterDefaultSoft"): self.parameterDefaultSoft = {} if not hasattr(self, "parameterListCritical"): self.parameterListCritical = [] if not hasattr(self, "parameterDefaultCritical"): self.parameterDefaultCritical = {} for j, what in enumerate(whatSoft): if what not in self.parameterToBeExcluded: self.parameterListSoft = [what] + self.parameterListSoft self.parameterDefaultSoft[what] = defaultSoft[j] for j, what in enumerate(whatCritical): if what not in self.parameterToBeExcluded: self.parameterListCritical = ([what] + self.parameterListCritical) self.parameterDefaultCritical[what] = defaultCritical[j] def _postInit(self): if self.depth == 0: vbMng(self, "DEL", "Done initializing.", 10) del self.depth else: self.depth -= 1 def name(self) -> str: return self.__class__.__name__ def __str__(self) -> str: return self.name() def __repr__(self) -> str: return self.__str__() + " at " + hex(id(self)) def setupSampling(self, reset_samples : bool = True): """Setup sampling engine.""" RROMPyAssert(self._mode, message = "Cannot setup sampling engine.") if not hasattr(self, "_POD") or self._POD is None: return if self.POD == 1: sEng = SamplingEnginePOD elif self.POD == 1/2: sEng = SamplingEngineNormalize else: sEng = SamplingEngine self.samplingEngine = sEng(self.HFEngine, verbosity = self.verbosity) if reset_samples: self.resetSamples() @property def HFEngine(self): """Value of HFEngine.""" return self._HFEngine @HFEngine.setter def HFEngine(self, HFEngine): raise RROMPyException("Cannot change HFEngine.") @property def mu0(self): """Value of mu0.""" return self._mu0 @mu0.setter def mu0(self, mu0): mu0 = checkParameter(mu0) if not hasattr(self, "_mu0") or mu0 != self.mu0: self.resetSamples() self._mu0 = mu0 @property def npar(self): """Number of parameters.""" return self.mu0.shape[1] def checkParameterList(self, mu:paramList, check_if_single : bool = False) -> paramList: return checkParameterList(mu, self.npar, check_if_single) def mapParameterList(self, *args, **kwargs): return self.HFEngine.mapParameterList(*args, **kwargs) @property def approxParameters(self): """Value of approximant parameters.""" return self._approxParameters @approxParameters.setter def approxParameters(self, approxParams): if not hasattr(self, "approxParameters"): self._approxParameters = {} approxParameters = purgeDict(approxParams, self.parameterList, dictname = self.name() + ".approxParameters", baselevel = 1) keyList = list(approxParameters.keys()) for key in self.parameterListCritical: if key in keyList: setattr(self, "_" + key, self.parameterDefaultCritical[key]) for key in self.parameterListSoft: if key in keyList: setattr(self, "_" + key, self.parameterDefaultSoft[key]) fragile = False for key in self.parameterListCritical: if key in keyList: val = approxParameters[key] else: val = getattr(self, "_" + key, None) if val is None: fragile = True val = self.parameterDefaultCritical[key] if self._mode == RROMPy_FRAGILE: setattr(self, "_" + key, val) self.approxParameters[key] = val else: getattr(self.__class__, key, None).fset(self, val) for key in self.parameterListSoft: if key in keyList: val = approxParameters[key] else: val = getattr(self, "_" + key, None) if val is None: val = self.parameterDefaultSoft[key] if self._mode == RROMPy_FRAGILE: setattr(self, "_" + key, val) self.approxParameters[key] = val else: getattr(self.__class__, key, None).fset(self, val) if fragile: self._mode = RROMPy_FRAGILE @property def POD(self): """Value of POD.""" return self._POD @POD.setter def POD(self, POD): if hasattr(self, "_POD"): PODold = self.POD else: PODold = -1 if POD not in [0, 1/2, 1]: raise RROMPyException("POD must be either 0, 1/2, or 1.") self._POD = POD self._approxParameters["POD"] = self.POD if PODold != self.POD: self.samplingEngine = None self.resetSamples() @property def scaleFactorDer(self): """Value of scaleFactorDer.""" if self._scaleFactorDer == "NONE": return 1. if self._scaleFactorDer == "AUTO": return self.scaleFactor return self._scaleFactorDer @scaleFactorDer.setter def scaleFactorDer(self, scaleFactorDer): if isinstance(scaleFactorDer, (str,)): scaleFactorDer = scaleFactorDer.upper() elif isinstance(scaleFactorDer, Iterable): scaleFactorDer = list(scaleFactorDer) self._scaleFactorDer = scaleFactorDer self._approxParameters["scaleFactorDer"] = self._scaleFactorDer @property def scaleFactorRel(self): """Value of scaleFactorDer / scaleFactor.""" if self._scaleFactorDer == "AUTO": return None try: return np.divide(self.scaleFactorDer, self.scaleFactor) except: raise RROMPyException(("Error in computation of relative scaling " "factor. Make sure that scaleFactor is " "properly initialized.")) from None @property def S(self): """Value of S.""" return self._S @S.setter def S(self, S): if S <= 0: raise RROMPyException("S must be positive.") if hasattr(self, "_S") and self._S is not None: Sold = self.S else: Sold = -1 self._S = S self._approxParameters["S"] = self.S if Sold != self.S: self.resetSamples() @property def trainedModel(self): """Value of trainedModel.""" return self._trainedModel @trainedModel.setter def trainedModel(self, trainedModel): self._trainedModel = trainedModel if self._trainedModel is not None: self._trainedModel.reset() self.lastSolvedApproxReduced = emptyParameterList() self.lastSolvedApprox = emptyParameterList() self.uApproxReduced = emptySampleList() self.uApprox = emptySampleList() def resetSamples(self): if hasattr(self, "samplingEngine") and self.samplingEngine is not None: self.samplingEngine.resetHistory() else: self.setupSampling() self._mode = RROMPy_READY def plotSamples(self, *args, **kwargs) -> List[str]: """ Do some nice plots of the samples. Returns: Output filenames. """ RROMPyAssert(self._mode, message = "Cannot plot samples.") return self.samplingEngine.plotSamples(*args, **kwargs) def outParaviewSamples(self, *args, **kwargs) -> List[str]: """ Output samples to ParaView file. Returns: Output filenames. """ RROMPyAssert(self._mode, message = "Cannot output samples.") return self.samplingEngine.outParaviewSamples(*args, **kwargs) def outParaviewTimeDomainSamples(self, *args, **kwargs) -> List[str]: """ Output samples to ParaView file, converted to time domain. Returns: Output filenames. """ RROMPyAssert(self._mode, message = "Cannot output samples.") return self.samplingEngine.outParaviewTimeDomainSamples(*args, **kwargs) def setTrainedModel(self, model): """Deepcopy approximation from trained model.""" if hasattr(model, "storeTrainedModel"): verb = model.verbosity model.verbosity = 0 fileOut = model.storeTrainedModel() model.verbosity = verb else: try: fileOut = getNewFilename("trained_model", "pkl") pickleDump(model.data.__dict__, fileOut) except: raise RROMPyException(("Failed to store model data. Parameter " "model must have either " "storeTrainedModel or " "data.__dict__ properties.")) from None self.loadTrainedModel(fileOut) osrm(fileOut) @abstractmethod def setupApprox(self) -> int: """ Setup approximant. (ABSTRACT) Any specialization should include something like self.trainedModel = ... self.trainedModel.data = ... self.trainedModel.data.approxParameters = copy( self.approxParameters) Returns > 0 if error was encountered, < 0 if no computation was necessary. """ if self.checkComputedApprox(): return -1 RROMPyAssert(self._mode, message = "Cannot setup approximant.") vbMng(self, "INIT", "Setting up {}.". format(self.name()), 5) pass vbMng(self, "DEL", "Done setting up approximant.", 5) return 0 def checkComputedApprox(self) -> bool: """ Check if setup of new approximant is not needed. Returns: True if new setup is not needed. False otherwise. """ return self._mode == RROMPy_FRAGILE or (self.trainedModel is not None and self.trainedModel.data.approxParameters == self.approxParameters and len(self.mus) == len(self.trainedModel.data.mus)) def _pruneBeforeEval(self, mu:paramList, field:str, append:bool, prune:bool) -> Tuple[paramList, Np1D, Np1D, bool]: mu = self.checkParameterList(mu) idx = np.empty(len(mu), dtype = np.int) if prune: jExtra = np.zeros(len(mu), dtype = bool) muExtra = emptyParameterList() lastSolvedMus = getattr(self, "lastSolved" + field) if (len(mu) > 0 and len(mu) == len(lastSolvedMus) and mu == lastSolvedMus): idx = np.arange(len(mu), dtype = np.int) return muExtra, jExtra, idx, True muKeep = copy(muExtra) for j in range(len(mu)): jPos = lastSolvedMus.find(mu[j]) if jPos is not None: idx[j] = jPos muKeep.append(mu[j]) else: jExtra[j] = True muExtra.append(mu[j]) if len(muKeep) > 0 and not append: lastSolvedu = getattr(self, "u" + field) idx[~jExtra] = getattr(self.__class__, "set" + field)(self, muKeep, lastSolvedu[idx[~jExtra]], append) append = True else: jExtra = np.ones(len(mu), dtype = bool) muExtra = mu return muExtra, jExtra, idx, append def _setObject(self, mu:paramList, field:str, object:sampList, append:bool) -> List[int]: newMus = self.checkParameterList(mu) newObj = sampleList(object) if append: getattr(self, "lastSolved" + field).append(newMus) getattr(self, "u" + field).append(newObj) Ltot = len(getattr(self, "u" + field)) return list(range(Ltot - len(newObj), Ltot)) setattr(self, "lastSolved" + field, copy(newMus)) setattr(self, "u" + field, copy(newObj)) return list(range(len(getattr(self, "u" + field)))) def setHF(self, muHF:paramList, uHF:sampleList, append : bool = False) -> List[int]: """Assign high fidelity solution.""" return self._setObject(muHF, "HF", uHF, append) def evalHF(self, mu:paramList, append : bool = False, prune : bool = True) -> List[int]: """ Find high fidelity solution with original parameters and arbitrary parameter. Args: mu: Target parameter. append(optional): Whether to append new HF solutions to old ones. prune(optional): Whether to remove duplicates of already appearing HF solutions. """ muExtra, jExtra, idx, append = self._pruneBeforeEval(mu, "HF", append, prune) if len(muExtra) > 0: - vbMng(self, "INIT", "Solving HF model for mu = {}.".format(mu), - 15) + muExtra = self.checkParameterList(muExtra) + vbMng(self, "INIT", + "Solving HF model for mu = {}.".format(muExtra), 15) newuHFs = self.HFEngine.solve(muExtra) vbMng(self, "DEL", "Done solving HF model.", 15) idx[jExtra] = self.setHF(muExtra, newuHFs, append) return list(idx) def setApproxReduced(self, muApproxR:paramList, uApproxR:sampleList, append : bool = False) -> List[int]: """Assign high fidelity solution.""" return self._setObject(muApproxR, "ApproxReduced", uApproxR, append) def evalApproxReduced(self, mu:paramList, append : bool = False, prune : bool = False) -> List[int]: """ Evaluate reduced representation of approximant at arbitrary parameter. Args: mu: Target parameter. append(optional): Whether to append new HF solutions to old ones. prune(optional): Whether to remove duplicates of already appearing HF solutions. """ self.setupApprox() muExtra, jExtra, idx, append = self._pruneBeforeEval(mu, "ApproxReduced", append, prune) if len(muExtra) > 0: newuApproxs = self.trainedModel.getApproxReduced(muExtra) idx[jExtra] = self.setApproxReduced(muExtra, newuApproxs, append) return list(idx) def setApprox(self, muApprox:paramList, uApprox:sampleList, append : bool = False) -> List[int]: """Assign high fidelity solution.""" return self._setObject(muApprox, "Approx", uApprox, append) def evalApprox(self, mu:paramList, append : bool = False, prune : bool = False) -> List[int]: """ Evaluate approximant at arbitrary parameter. Args: mu: Target parameter. append(optional): Whether to append new HF solutions to old ones. prune(optional): Whether to remove duplicates of already appearing HF solutions. """ self.setupApprox() muExtra, jExtra, idx, append = self._pruneBeforeEval(mu, "Approx", append, prune) if len(muExtra) > 0: newuApproxs = self.trainedModel.getApprox(muExtra) idx[jExtra] = self.setApprox(muExtra, newuApproxs, append) return list(idx) def getHF(self, *args, **kwargs) -> sampList: """ Get HF solution at arbitrary parameter. Returns: HFsolution. """ idx = self.evalHF(*args, **kwargs) return self.uHF(idx) def getRHS(self, mu:paramList) -> sampList: """ Get linear system RHS at arbitrary parameter. Args: mu: Target parameter. Returns: Linear system RHS. """ return self.HFEngine.residual(mu, None) def getApproxReduced(self, *args, **kwargs) -> sampList: """ Get approximant at arbitrary parameter. Returns: Reduced approximant. """ idx = self.evalApproxReduced(*args, **kwargs) return self.uApproxReduced(idx) def getApprox(self, *args, **kwargs) -> sampList: """ Get approximant at arbitrary parameter. Returns: Approximant. """ idx = self.evalApprox(*args, **kwargs) return self.uApprox(idx) def getRes(self, mu:paramList, *args, **kwargs) -> sampList: """ Get residual at arbitrary parameter. Args: mu: Target parameter. Returns: Approximant residual. """ if not self.HFEngine.isCEye: raise RROMPyException(("Residual of solution with non-scalar C " "not computable.")) return self.HFEngine.residual(mu, self.getApprox(mu, *args, **kwargs) / self.HFEngine.C(mu)) def getErr(self, *args, **kwargs) -> sampList: """ Get error at arbitrary parameter. Returns: Approximant error. """ return self.getApprox(*args, **kwargs) - self.getHF(*args, **kwargs) def getPoles(self, *args, **kwargs) -> paramList: """ Obtain approximant poles. Returns: Numpy complex vector of poles. """ self.setupApprox() vbMng(self, "INIT", "Computing poles of model.", 20) poles = self.trainedModel.getPoles(*args, **kwargs) vbMng(self, "DEL", "Done computing poles.", 20) return poles def storeSamples(self, filenameBase : str = "samples", forceNewFile : bool = True) -> str: """Store samples to file.""" filename = filenameBase + "_" + self.name() if forceNewFile: filename = getNewFilename(filename, "pkl")[: - 4] return self.samplingEngine.store(filename, False) def storeTrainedModel(self, filenameBase : str = "trained_model", forceNewFile : bool = True) -> str: """Store trained reduced model to file.""" self.setupApprox() filename = None if masterCore(): vbMng(self, "INIT", "Storing trained model to file.", 20) if forceNewFile: filename = getNewFilename(filenameBase, "pkl") else: filename = "{}.pkl".format(filenameBase) pickleDump(self.trainedModel.data.__dict__, filename) vbMng(self, "DEL", "Done storing trained model.", 20) filename = bcast(filename) return filename def loadTrainedModel(self, filename:str): """Load trained reduced model from file.""" vbMng(self, "INIT", "Loading pre-trained model from file.", 20) datadict = pickleLoad(filename) self.mu0 = datadict["mu0"] self.scaleFactor = datadict["scaleFactor"] self.mus = datadict["mus"] self.trainedModel = self.tModelType() self.trainedModel.verbosity = self.verbosity self.trainedModel.timestamp = self.timestamp data, selfkeys = self.initializeModelData(datadict) for key in selfkeys: setattr(self, key, datadict.pop(key)) approxParameters = datadict.pop("approxParameters") data.approxParameters = copy(approxParameters) for apkey in data.approxParameters.keys(): self._approxParameters[apkey] = approxParameters.pop(apkey) setattr(self, "_" + apkey, self._approxParameters[apkey]) for key in datadict: setattr(data, key, datadict[key]) self.trainedModel.data = data self._mode = RROMPy_FRAGILE vbMng(self, "DEL", "Done loading pre-trained model.", 20) diff --git a/rrompy/reduction_methods/pivoted/trained_model/trained_model_pivoted_rational_nomatch.py b/rrompy/reduction_methods/pivoted/trained_model/trained_model_pivoted_rational_nomatch.py index 3a31d84..5ea1d21 100644 --- a/rrompy/reduction_methods/pivoted/trained_model/trained_model_pivoted_rational_nomatch.py +++ b/rrompy/reduction_methods/pivoted/trained_model/trained_model_pivoted_rational_nomatch.py @@ -1,241 +1,229 @@ # Copyright (C) 2018-2020 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import numpy as np -from collections.abc import Iterable from rrompy.reduction_methods.standard.trained_model.trained_model_rational \ import TrainedModelRational from rrompy.utilities.base.types import (Np1D, Np2D, Tuple, List, ListAny, paramVal, paramList, sampList) from rrompy.utilities.base import verbosityManager as vbMng, freepar as fp from rrompy.utilities.numerical import dot from rrompy.utilities.numerical.compress_matrix import compressMatrix from rrompy.utilities.poly_fitting.heaviside import rational2heaviside from rrompy.utilities.poly_fitting.nearest_neighbor import ( NearestNeighborInterpolator as NNI) from rrompy.utilities.exception_manager import (RROMPyException, RROMPyAssert, RROMPyWarning) from rrompy.parameter import checkParameterList from rrompy.sampling import emptySampleList __all__ = ['TrainedModelPivotedRationalNoMatch'] class TrainedModelPivotedRationalNoMatch(TrainedModelRational): """ ROM approximant evaluation for pivoted approximants based on interpolation of rational approximants (without pole matching). Attributes: Data: dictionary with all that can be pickled. """ def checkParameterListPivot(self, mu:paramList, check_if_single : bool = False) -> paramList: return checkParameterList(mu, self.data.nparPivot, check_if_single) def checkParameterListMarginal(self, mu:paramList, check_if_single : bool = False) -> paramList: return checkParameterList(mu, self.data.nparMarginal, check_if_single) def compress(self, collapse : bool = False, tol : float = 0., returnRMat : bool = False, **compressMatrixkwargs): if not collapse and tol <= 0.: return RMat = self.data.projMat if not collapse: if hasattr(self.data, "_compressTol"): RROMPyWarning(("Recompressing already compressed model is " "ineffective. Aborting.")) return self.data.projMat, RMat, _ = compressMatrix(RMat, tol, **compressMatrixkwargs) if hasattr(self.data, "Ps"): for obj, suppj in zip(self.data.Ps, self.data.Psupp): obj.postmultiplyTensorize(RMat.T[suppj : suppj + obj.shape[0]]) if hasattr(self, "_PsExcl"): for obj, suppj in zip(self._PsExcl, self._PsuppExcl): obj.postmultiplyTensorize(RMat.T[suppj : suppj + obj.shape[0]]) self._PsuppExcl = [0] * len(self._PsuppExcl) self.data.Psupp = [0] * len(self.data.Psupp) super(TrainedModelRational, self).compress(collapse, tol) if returnRMat: return RMat def centerNormalizePivot(self, mu : paramList = [], mu0 : paramVal = None) -> paramList: """ Compute normalized parameter to be plugged into approximant. Args: mu: Parameter(s) 1. mu0: Parameter(s) 2. If None, set to self.data.mu0Pivot. Returns: Normalized parameter. """ mu = self.checkParameterListPivot(mu) if mu0 is None: mu0 = self.checkParameterListPivot( self.data.mu0(0, self.data.directionPivot)) return (self.mapParameterList(mu, idx = self.data.directionPivot) - self.mapParameterList(mu0, idx = self.data.directionPivot) ) / [self.data.scaleFactor[x] for x in self.data.directionPivot] def setupMarginalInterp(self, interpPars:ListAny): self.data.marginalInterp = NNI() self.data.marginalInterp.setupByInterpolation(self.data.musMarginal, np.arange(len(self.data.musMarginal)), 1, *interpPars) def updateEffectiveSamples(self, exclude:List[int]): if hasattr(self, "_idxExcl"): for j, excl in enumerate(self._idxExcl): self.data.musMarginal.insert(self._musMExcl[j], excl) self.data.Ps.insert(excl, self._PsExcl[j]) self.data.Qs.insert(excl, self._QsExcl[j]) self.data.Psupp.insert(excl, self._PsuppExcl[j]) self._idxExcl, self._musMExcl = list(np.sort(exclude)), [] self._PsExcl, self._QsExcl, self._PsuppExcl = [], [], [] for excl in self._idxExcl[::-1]: self._musMExcl = [self.data.musMarginal[excl]] + self._musMExcl self.data.musMarginal.pop(excl) self._PsExcl = [self.data.Ps.pop(excl)] + self._PsExcl self._QsExcl = [self.data.Qs.pop(excl)] + self._QsExcl self._PsuppExcl = [self.data.Psupp.pop(excl)] + self._PsuppExcl def getPVal(self, mu : paramList = []) -> sampList: """ Evaluate rational numerator at arbitrary parameter. Args: mu: Target parameter. """ RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters") mu = self.checkParameterList(mu) muP = self.centerNormalizePivot(mu(self.data.directionPivot)) muM = self.checkParameterListMarginal(mu(self.data.directionMarginal)) idxMUnique, idxMmap = np.unique(self.data.marginalInterp(muM), return_inverse = True) idxMUnique = np.array(idxMUnique, dtype = int) p = emptySampleList() vbMng(self, "INIT", "Evaluating numerator at mu = {}.".format(mu), 17) for i, iM in enumerate(idxMUnique): idx = np.where(idxMmap == i)[0] Pval, supp = self.data.Ps[iM](muP[idx]), self.data.Psupp[iM] if i == 0: p.reset((self.data.projMat.shape[1], len(mu)), dtype = Pval.dtype) p.data[:] = 0. p.data[supp : supp + len(Pval), idx] = Pval vbMng(self, "DEL", "Done evaluating numerator.", 17) return p def getQVal(self, mu:Np1D, der : List[int] = None, scl : Np1D = None) -> Np1D: """ Evaluate rational denominator at arbitrary parameter. Args: mu: Target parameter. der(optional): Derivatives to take before evaluation. """ RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters") mu = self.checkParameterList(mu) muP = self.centerNormalizePivot(mu(self.data.directionPivot)) muM = self.checkParameterListMarginal(mu(self.data.directionMarginal)) if der is None: derP, derM = 0, [0] else: derP = der[self.data.directionPivot[0]] derM = [der[x] for x in self.data.directionMarginal] if np.any(np.array(derM) != 0): raise RROMPyException(("Derivatives of Q with respect to marginal " "parameters not allowed.")) sclP = 1 if scl is None else scl[self.data.directionPivot[0]] idxMUnique, idxMmap = np.unique(self.data.marginalInterp(muM), return_inverse = True) idxMUnique = np.array(idxMUnique, dtype = int) vbMng(self, "INIT", "Evaluating denominator at mu = {}.".format(mu), 17) for i, iM in enumerate(idxMUnique): idx = np.where(idxMmap == i)[0] Qval = self.data.Qs[iM](muP[idx], derP, sclP) if i == 0: q = np.empty(len(mu), dtype = Qval.dtype) q[idx] = Qval vbMng(self, "DEL", "Done evaluating denominator.", 17) return q def getPoles(self, marginalVals : ListAny = [fp]) -> paramList: """ Obtain approximant poles. Returns: Numpy complex vector of poles. """ RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters") mVals = list(marginalVals) rDim = mVals.index(fp) if rDim < len(mVals) - 1 and fp in mVals[rDim + 1 :]: raise RROMPyException(("Exactly 1 'freepar' entry in " "marginalVals must be provided.")) if rDim != self.data.directionPivot[0]: raise RROMPyException(("'freepar' entry in marginalVals must " "coincide with pivot direction.")) mVals[rDim] = self.data.mu0(rDim)[0] muM = self.checkParameterListMarginal([mVals[j] for j in range(len(mVals)) if j != rDim]) iM = int(self.data.marginalInterp(muM)) roots = self.data.scaleFactor[rDim] * self.data.Qs[iM].roots() return self.mapParameterList(self.mapParameterList(self.data.mu0(rDim), idx = [rDim])(0, 0) + roots, "B", [rDim])(0) def getResidues(self, *args, **kwargs) -> Tuple[paramList, Np2D]: """ Obtain approximant residues. Returns: Numpy matrix with residues as columns. """ - RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters") - if len(args) + len(kwargs) > 1: - raise RROMPyException(("Wrong number of parameters passed. " - "Only 1 available.")) - elif len(args) + len(kwargs) == 1: - if len(args) == 1: - mVals = args[0] - else: - mVals = kwargs["marginalVals"] - if not isinstance(mVals, Iterable): mVals = [mVals] - mVals = list(mVals) - else: + pls = self.getPoles(*args, **kwargs) + if len(pls) == 0: + return pls, np.empty((0, 0), dtype = self.data.Ps[0].coeffs.dtype) + if len(args) == 1: + mVals = args[0] + elif len(args) == 0: mVals = [fp] + else: + mVals = kwargs["marginalVals"] rDim = mVals.index(fp) - if rDim < len(mVals) - 1 and fp in mVals[rDim + 1 :]: - raise RROMPyException(("Exactly 1 'freepar' entry in " - "marginalVals must be provided.")) - if rDim != self.data.directionPivot[0]: - raise RROMPyException(("'freepar' entry in marginalVals must " - "coincide with pivot direction.")) mVals[rDim] = self.data.mu0(rDim)[0] muM = self.checkParameterListMarginal([mVals[j] for j in range(len(mVals)) if j != rDim]) iM = int(self.data.marginalInterp(muM)) - res, pls, basis = rational2heaviside(self.data.Ps[iM], - self.data.Qs[iM]) + res = rational2heaviside(self.data.Ps[iM], self.data.Qs[iM])[0] res = res[: len(pls), :].T if not self.data._collapsed: res = dot(self.data.projMat, res).T return pls, res diff --git a/rrompy/reduction_methods/standard/greedy/generic_greedy_approximant.py b/rrompy/reduction_methods/standard/greedy/generic_greedy_approximant.py index a6a1f15..692d4bd 100644 --- a/rrompy/reduction_methods/standard/greedy/generic_greedy_approximant.py +++ b/rrompy/reduction_methods/standard/greedy/generic_greedy_approximant.py @@ -1,625 +1,630 @@ # Copyright (C) 2018-2020 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from abc import abstractmethod from copy import deepcopy as copy import numpy as np from matplotlib import pyplot as plt from rrompy.hfengines.base.linear_affine_engine import checkIfAffine from rrompy.reduction_methods.standard.generic_standard_approximant import ( GenericStandardApproximant) from rrompy.utilities.base.types import (Np1D, Np2D, Tuple, List, paramVal, paramList, sampList) from rrompy.utilities.base import verbosityManager as vbMng from rrompy.utilities.numerical import dot from rrompy.utilities.expression import expressionEvaluator from rrompy.utilities.exception_manager import (RROMPyException, RROMPyAssert, RROMPyWarning) from rrompy.sampling.sample_list import sampleList from rrompy.parameter import emptyParameterList, parameterList from rrompy.utilities.parallel import masterCore __all__ = ['GenericGreedyApproximant'] def localL2Distance(mus:Np2D, badmus:Np2D) -> Np2D: return np.linalg.norm(np.tile(mus[..., np.newaxis], [1, 1, len(badmus)]) - badmus[..., np.newaxis].T, axis = 1) def pruneSamples(mus:paramList, badmus:paramList, tol : float = 1e-8) -> Np1D: """Remove from mus all the elements which are too close to badmus.""" if isinstance(mus, (parameterList, sampleList)): mus = mus.data if isinstance(badmus, (parameterList, sampleList)): badmus = badmus.data if len(badmus) == 0: return np.arange(len(mus)) proximity = np.min(localL2Distance(mus, badmus), axis = 1) return np.where(proximity <= tol)[0] class GenericGreedyApproximant(GenericStandardApproximant): """ ROM greedy interpolant computation for parametric problems (ABSTRACT). Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': kind of snapshots orthogonalization; allowed values include 0, 1/2, and 1; defaults to 1, i.e. POD; - 'scaleFactorDer': scaling factors for derivative computation; defaults to 'AUTO'; - 'S': number of starting training points; - 'sampler': sample point generator; - 'greedyTol': uniform error tolerance for greedy algorithm; defaults to 1e-2; - 'collinearityTol': collinearity tolerance for greedy algorithm; defaults to 0.; - 'maxIter': maximum number of greedy steps; defaults to 1e2; - 'nTestPoints': number of test points; defaults to 5e2; - 'samplerTrainSet': training sample points generator; defaults to sampler. Defaults to empty dict. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. mu0: Default parameter. mus: Array of snapshot parameters. approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList. parameterListSoft: Recognized keys of soft approximant parameters: - 'POD': kind of snapshots orthogonalization; - 'scaleFactorDer': scaling factors for derivative computation; - 'greedyTol': uniform error tolerance for greedy algorithm; - 'collinearityTol': collinearity tolerance for greedy algorithm; - 'maxIter': maximum number of greedy steps; - 'nTestPoints': number of test points; - 'samplerTrainSet': training sample points generator. parameterListCritical: Recognized keys of critical approximant parameters: - 'S': total number of samples current approximant relies upon; - 'sampler': sample point generator. verbosity: Verbosity level. POD: Kind of snapshots orthogonalization. scaleFactorDer: Scaling factors for derivative computation. S: number of test points. sampler: Sample point generator. greedyTol: Uniform error tolerance for greedy algorithm. collinearityTol: Collinearity tolerance for greedy algorithm. maxIter: maximum number of greedy steps. nTestPoints: number of starting training points. samplerTrainSet: training sample points generator. muBounds: list of bounds for parameter values. samplingEngine: Sampling engine. uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as sampleList. lastSolvedHF: Parameter(s) corresponding to last computed high fidelity solution(s) as parameterList. uApproxReduced: Reduced approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApproxReduced: Parameter(s) corresponding to last computed reduced approximate solution(s) as parameterList. uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApprox: Parameter(s) corresponding to last computed approximate solution(s) as parameterList. """ def __init__(self, *args, **kwargs): self._preInit() if not hasattr(self, "_affine_lvl"): self._affine_lvl = [] self._affine_lvl += [1] self._addParametersToList(["greedyTol", "collinearityTol", "maxIter", "nTestPoints", "samplerTrainSet"], [1e-2, 0., 1e2, 5e2, "AUTO"]) super().__init__(*args, **kwargs) self._postInit() @property def greedyTol(self): """Value of greedyTol.""" return self._greedyTol @greedyTol.setter def greedyTol(self, greedyTol): if greedyTol < 0: raise RROMPyException("greedyTol must be non-negative.") if hasattr(self, "_greedyTol") and self.greedyTol is not None: greedyTolold = self.greedyTol else: greedyTolold = -1 self._greedyTol = greedyTol self._approxParameters["greedyTol"] = self.greedyTol if greedyTolold != self.greedyTol: self.resetSamples() @property def collinearityTol(self): """Value of collinearityTol.""" return self._collinearityTol @collinearityTol.setter def collinearityTol(self, collinearityTol): if collinearityTol < 0: raise RROMPyException("collinearityTol must be non-negative.") if (hasattr(self, "_collinearityTol") and self.collinearityTol is not None): collinearityTolold = self.collinearityTol else: collinearityTolold = -1 self._collinearityTol = collinearityTol self._approxParameters["collinearityTol"] = self.collinearityTol if collinearityTolold != self.collinearityTol: self.resetSamples() @property def maxIter(self): """Value of maxIter.""" return self._maxIter @maxIter.setter def maxIter(self, maxIter): if maxIter <= 0: raise RROMPyException("maxIter must be positive.") if hasattr(self, "_maxIter") and self.maxIter is not None: maxIterold = self.maxIter else: maxIterold = -1 self._maxIter = maxIter self._approxParameters["maxIter"] = self.maxIter if maxIterold != self.maxIter: self.resetSamples() @property def nTestPoints(self): """Value of nTestPoints.""" return self._nTestPoints @nTestPoints.setter def nTestPoints(self, nTestPoints): if nTestPoints <= 0: raise RROMPyException("nTestPoints must be positive.") if not np.isclose(nTestPoints, np.int(nTestPoints)): raise RROMPyException("nTestPoints must be an integer.") nTestPoints = np.int(nTestPoints) if hasattr(self, "_nTestPoints") and self.nTestPoints is not None: nTestPointsold = self.nTestPoints else: nTestPointsold = -1 self._nTestPoints = nTestPoints self._approxParameters["nTestPoints"] = self.nTestPoints if nTestPointsold != self.nTestPoints: self.resetSamples() @property def samplerTrainSet(self): """Value of samplerTrainSet.""" return self._samplerTrainSet @samplerTrainSet.setter def samplerTrainSet(self, samplerTrainSet): if (isinstance(samplerTrainSet, (str,)) and samplerTrainSet.upper() == "AUTO"): samplerTrainSet = self.sampler if 'generatePoints' not in dir(samplerTrainSet): raise RROMPyException("samplerTrainSet type not recognized.") if (hasattr(self, '_samplerTrainSet') and self.samplerTrainSet not in [None, "AUTO"]): samplerTrainSetOld = self.samplerTrainSet self._samplerTrainSet = samplerTrainSet self._approxParameters["samplerTrainSet"] = self.samplerTrainSet if (not 'samplerTrainSetOld' in locals() or samplerTrainSetOld != self.samplerTrainSet): self.resetSamples() def resetSamples(self): """Reset samples.""" super().resetSamples() self._mus = emptyParameterList() def _affineResidualMatricesContraction(self, rb:Np2D, rA : Np2D = None) \ -> Tuple[Np1D, Np1D, Np1D]: self.assembleReducedResidualBlocks(full = rA is not None) # 'ij,jk,ik->k', resbb, radiusb, radiusb.conj() ff = np.sum(self.trainedModel.data.resbb.dot(rb) * rb.conj(), axis = 0) if rA is None: return ff # 'ijk,jkl,il->l', resAb, radiusA, radiusb.conj() Lf = np.sum(np.tensordot(self.trainedModel.data.resAb, rA, 2) * rb.conj(), axis = 0) # 'ijkl,klt,ijt->t', resAA, radiusA, radiusA.conj() LL = np.sum(np.tensordot(self.trainedModel.data.resAA, rA, 2) * rA.conj(), axis = (0, 1)) return ff, Lf, LL def getErrorEstimatorAffine(self, mus:Np1D) -> Np1D: """Standard residual estimator.""" checkIfAffine(self.HFEngine, "apply affinity-based error estimator", False, self._affine_lvl) self.HFEngine.buildA() self.HFEngine.buildb() mus = self.checkParameterList(mus) tMverb, self.trainedModel.verbosity = self.trainedModel.verbosity, 0 uApproxRs = self.getApproxReduced(mus).data self.trainedModel.verbosity = tMverb muTestEff = self.mapParameterList(mus) radiusA = np.empty((len(self.HFEngine.thAs), len(mus)), dtype = np.complex) radiusb = np.empty((len(self.HFEngine.thbs), len(mus)), dtype = np.complex) for j, thA in enumerate(self.HFEngine.thAs): radiusA[j] = expressionEvaluator(thA[0], muTestEff) for j, thb in enumerate(self.HFEngine.thbs): radiusb[j] = expressionEvaluator(thb[0], muTestEff) radiusA = np.expand_dims(uApproxRs, 1) * radiusA ff, Lf, LL = self._affineResidualMatricesContraction(radiusb, radiusA) err = np.abs((LL - 2. * np.real(Lf) + ff) / ff) ** .5 return err def errorEstimator(self, mus:Np1D, return_max : bool = False) -> Np1D: setupOK = self.setupApproxLocal() if setupOK > 0: err = np.empty(len(mus)) err[:] = np.nan if not return_max: return err return err, [- setupOK], np.nan mus = self.checkParameterList(mus) vbMng(self.trainedModel, "INIT", "Evaluating error estimator at mu = {}.".format(mus), 10) err = self.getErrorEstimatorAffine(mus) vbMng(self.trainedModel, "DEL", "Done evaluating error estimator.", 10) if not return_max: return err idxMaxEst = [np.argmax(err)] return err, idxMaxEst, err[idxMaxEst] def _isLastSampleCollinear(self) -> bool: """Check collinearity of last sample.""" if self.collinearityTol <= 0.: return False if self.POD == 1: reff = self.samplingEngine.Rscale[:, -1] else: RROMPyWarning(("Repeated orthogonalization of the samples for " "collinearity check. Consider setting POD to " "True.")) if not hasattr(self, "_PODEngine"): from rrompy.sampling import PODEngine self._PODEngine = PODEngine(self.HFEngine) reff = self._PODEngine.generalizedQR(self.samplingEngine.samples, only_R = True, is_state = True)[:, -1] cLevel = np.abs(reff[-1]) / np.linalg.norm(reff) cLevel = np.inf if np.isclose(cLevel, 0., atol = 1e-15) else 1 / cLevel vbMng(self, "MAIN", "Collinearity indicator {:.4e}.".format(cLevel), 3) return cLevel > self.collinearityTol def plotEstimator(self, est:Np1D, idxMax:List[int], estMax:List[float]): if (not (np.any(np.isnan(est)) or np.any(np.isinf(est))) and masterCore()): fig = plt.figure(figsize = plt.figaspect(1. / self.npar)) for jpar in range(self.npar): ax = fig.add_subplot(1, self.npar, 1 + jpar) musre = np.array(self.muTest.re.data) errCP = copy(est) idx = np.delete(np.arange(self.npar), jpar) while len(musre) > 0: if self.npar == 1: currIdx = np.arange(len(musre)) else: currIdx = np.where(np.isclose(np.sum( np.abs(musre[:, idx] - musre[0, idx]), 1), 0., atol = 1e-15))[0] ax.semilogy(musre[currIdx, jpar], errCP[currIdx], 'k', linewidth = 1) musre = np.delete(musre, currIdx, 0) errCP = np.delete(errCP, currIdx) ax.semilogy([self.muBounds.re(0, jpar), self.muBounds.re(-1, jpar)], [self.greedyTol] * 2, 'r--') ax.semilogy(self.mus.re(jpar), 2. * self.greedyTol * np.ones(len(self.mus)), '*m') if len(idxMax) > 0 and estMax is not None: ax.semilogy(self.muTest.re(idxMax, jpar), estMax, 'xr') ax.set_xlim(*list(self.sampler.lims.re(jpar))) ax.grid() plt.tight_layout() plt.show() def greedyNextSample(self, muidx:int, plotEst : str = "NONE")\ -> Tuple[Np1D, int, float, paramVal]: """Compute next greedy snapshot of solution map.""" RROMPyAssert(self._mode, message = "Cannot add greedy sample.") mus = copy(self.muTest[muidx]) self.muTest.pop(muidx) for j, mu in enumerate(mus): vbMng(self, "MAIN", ("Adding sample point no. {} at {} to training " "set.").format(len(self.mus) + 1, mu), 3) self.mus.append(mu) self._S = len(self.mus) self._approxParameters["S"] = self.S if (self.samplingEngine.nsamples <= len(mus) - j - 1 or not np.allclose(mu, self.samplingEngine.mus[j - len(mus)])): self.samplingEngine.nextSample(mu) if self._isLastSampleCollinear(): vbMng(self, "MAIN", ("Collinearity above tolerance detected. Starting " "preemptive greedy loop termination."), 3) self._collinearityFlag = 1 errorEstTest = np.empty(len(self.muTest)) errorEstTest[:] = np.nan return errorEstTest, [-1], np.nan, np.nan errorEstTest, muidx, maxErrorEst = self.errorEstimator(self.muTest, True) if plotEst == "ALL": self.plotEstimator(errorEstTest, muidx, maxErrorEst) return errorEstTest, muidx, maxErrorEst, self.muTest[muidx] def _preliminaryTraining(self): """Initialize starting snapshots of solution map.""" RROMPyAssert(self._mode, message = "Cannot start greedy algorithm.") if self.samplingEngine.nsamples > 0: return self.resetSamples() self.computeScaleFactor() self.samplingEngine.scaleFactor = self.scaleFactorDer self.mus = self.samplerTrainSet.generatePoints(self.S) while len(self.mus) > self.S: self.mus.pop() muTestBase = self.sampler.generatePoints(self.nTestPoints, False) idxPop = pruneSamples(self.mapParameterList(muTestBase), self.mapParameterList(self.mus), 1e-10 * self.scaleFactor[0]) muTestBase.pop(idxPop) muLast = copy(self.mus[-1]) self.mus.pop() if len(self.mus) > 0: vbMng(self, "MAIN", ("Adding first {} sample point{} at {} to training " "set.").format(self.S - 1, "" + "s" * (self.S > 2), self.mus), 3) self.samplingEngine.iterSample(self.mus) self._S = len(self.mus) self._approxParameters["S"] = self.S self.muTest = emptyParameterList() self.muTest.reset((len(muTestBase) + 1, self.mus.shape[1])) self.muTest.data[: -1] = muTestBase.data self.muTest.data[-1] = muLast.data @abstractmethod def setupApproxLocal(self) -> int: if self.checkComputedApprox(): return -1 RROMPyAssert(self._mode, message = "Cannot setup approximant.") vbMng(self, "INIT", "Setting up local approximant.", 5) pass vbMng(self, "DEL", "Done setting up local approximant.", 5) return 0 + _postGreedyRecover = 1 def setupApprox(self, plotEst : str = "NONE") -> int: """Compute greedy snapshots of solution map.""" if self.checkComputedApprox(): return -1 RROMPyAssert(self._mode, message = "Cannot start greedy algorithm.") vbMng(self, "INIT", "Setting up {}.". format(self.name()), 5) vbMng(self, "INIT", "Starting computation of snapshots.", 5) self._collinearityFlag = 0 self._preliminaryTraining() muidx, self.firstGreedyIter = [len(self.muTest) - 1], True errorEstTest, maxErrorEst = [np.inf], np.inf max2ErrorEst, trainedModelOld = np.inf, None while self.firstGreedyIter or (len(self.muTest) > 0 and (maxErrorEst is None or max2ErrorEst > self.greedyTol) and self.samplingEngine.nsamples < self.maxIter): muTestOld, errorEstTestOld = self.muTest, errorEstTest muidxOld, maxErrorEstOld = muidx, maxErrorEst errorEstTest, muidx, maxErrorEst, mu = self.greedyNextSample(muidx, plotEst) if maxErrorEst is not None and (np.any(np.isnan(maxErrorEst)) or np.any(np.isinf(maxErrorEst))): if self._collinearityFlag == 0 and not self.firstGreedyIter: RROMPyWarning(("Instability in a posteriori " "estimator. Starting preemptive greedy " "loop termination.")) self.muTest, errorEstTest = muTestOld, errorEstTestOld if self.firstGreedyIter and muidx[0] < 0: self.trainedModel = None + if self._collinearityFlag: + raise RROMPyException(("Starting sample points too " + "collinear. Aborting greedy " + "iterations.")) raise RROMPyException(("Instability in approximant " "computation. Aborting greedy " "iterations.")) self._S = trainedModelOld.data.approxParameters["S"] self._approxParameters["S"] = self.S while self.samplingEngine.nsamples > self.S: self.samplingEngine.popSample() while len(self.mus) > self.S: self.mus.pop(-1) muidx, maxErrorEst = muidxOld, maxErrorEstOld break if maxErrorEst is not None: max2ErrorEst = np.max(maxErrorEst) vbMng(self, "MAIN", ("Uniform testing error estimate " "{:.4e}.").format(max2ErrorEst), 5) if self.firstGreedyIter: trainedModelOld = copy(self.trainedModel) else: trainedModelOld.data = copy(self.trainedModel.data) self.firstGreedyIter = False vbMng(self, "DEL", ("Done computing snapshots (final snapshot count: " "{}).").format(self.samplingEngine.nsamples), 5) - if (maxErrorEst is None or max2ErrorEst <= self.greedyTol - or np.any(np.isnan(maxErrorEst)) or np.any(np.isinf(maxErrorEst))): + if (maxErrorEst is None or np.any(np.isnan(maxErrorEst)) + or np.any(np.isinf(maxErrorEst))): while self.samplingEngine.nsamples > self.S: self.samplingEngine.popSample() while len(self.mus) > self.S: self.mus.pop(-1) - else: + elif self._postGreedyRecover: self._S = self.samplingEngine.nsamples while len(self.mus) < self.S: self.mus.append(self.samplingEngine.mus[len(self.mus)]) self.trainedModel = None self.setupApproxLocal() if plotEst == "LAST": self.plotEstimator(errorEstTest, muidx, maxErrorEst) vbMng(self, "DEL", "Done setting up approximant.", 5) return 0 def assembleReducedResidualGramian(self, pMat:sampList): """ Build residual gramian of reduced linear system through projections. """ if (not hasattr(self.trainedModel.data, "gramian") or self.trainedModel.data.gramian is None): gramian = self.HFEngine.innerProduct(pMat, pMat, dual = True) else: Sold = self.trainedModel.data.gramian.shape[0] S = len(self.mus) if Sold > S: gramian = self.trainedModel.data.gramian[: S, : S] else: idxOld = list(range(Sold)) idxNew = list(range(Sold, S)) gramian = np.empty((S, S), dtype = np.complex) gramian[: Sold, : Sold] = self.trainedModel.data.gramian gramian[: Sold, Sold :] = self.HFEngine.innerProduct( pMat(idxNew), pMat(idxOld), dual = True) gramian[Sold :, : Sold] = gramian[: Sold, Sold :].T.conj() gramian[Sold :, Sold :] = self.HFEngine.innerProduct( pMat(idxNew), pMat(idxNew), dual = True) self.trainedModel.data.gramian = gramian def assembleReducedResidualBlocksbb(self, bs:List[Np1D]): """ Build blocks (of type bb) of reduced linear system through projections. """ nbs = len(bs) if (not hasattr(self.trainedModel.data, "resbb") or self.trainedModel.data.resbb is None): resbb = np.empty((nbs, nbs), dtype = np.complex) for i in range(nbs): Mbi = bs[i] resbb[i, i] = self.HFEngine.innerProduct(Mbi, Mbi, dual = True) for j in range(i): Mbj = bs[j] resbb[i, j] = self.HFEngine.innerProduct(Mbj, Mbi, dual = True) for i in range(nbs): for j in range(i + 1, nbs): resbb[i, j] = resbb[j, i].conj() self.trainedModel.data.resbb = resbb def assembleReducedResidualBlocksAb(self, As:List[Np2D], bs:List[Np1D], pMat:sampList): """ Build blocks (of type Ab) of reduced linear system through projections. """ nAs = len(As) nbs = len(bs) S = len(self.mus) if (not hasattr(self.trainedModel.data, "resAb") or self.trainedModel.data.resAb is None): if isinstance(pMat, (parameterList, sampleList)): pMat = pMat.data resAb = np.empty((nbs, S, nAs), dtype = np.complex) for j in range(nAs): MAj = dot(As[j], pMat) for i in range(nbs): Mbi = bs[i] resAb[i, :, j] = self.HFEngine.innerProduct(MAj, Mbi, dual = True) else: Sold = self.trainedModel.data.resAb.shape[1] if Sold == S: return if Sold > S: resAb = self.trainedModel.data.resAb[:, : S, :] else: if isinstance(pMat, (parameterList, sampleList)): pMat = pMat.data resAb = np.empty((nbs, S, nAs), dtype = np.complex) resAb[:, : Sold, :] = self.trainedModel.data.resAb for j in range(nAs): MAj = dot(As[j], pMat[:, Sold :]) for i in range(nbs): Mbi = bs[i] resAb[i, Sold :, j] = self.HFEngine.innerProduct( MAj, Mbi, dual = True) self.trainedModel.data.resAb = resAb def assembleReducedResidualBlocksAA(self, As:List[Np2D], pMat:sampList): """ Build blocks (of type AA) of reduced linear system through projections. """ nAs = len(As) S = len(self.mus) if (not hasattr(self.trainedModel.data, "resAA") or self.trainedModel.data.resAA is None): if isinstance(pMat, (parameterList, sampleList)): pMat = pMat.data resAA = np.empty((S, nAs, S, nAs), dtype = np.complex) for i in range(nAs): MAi = dot(As[i], pMat) resAA[:, i, :, i] = self.HFEngine.innerProduct(MAi, MAi, dual = True) for j in range(i): MAj = dot(As[j], pMat) resAA[:, i, :, j] = self.HFEngine.innerProduct(MAj, MAi, dual = True) for i in range(nAs): for j in range(i + 1, nAs): resAA[:, i, :, j] = resAA[:, j, :, i].T.conj() else: Sold = self.trainedModel.data.resAA.shape[0] if Sold == S: return if Sold > S: resAA = self.trainedModel.data.resAA[: S, :, : S, :] else: if isinstance(pMat, (parameterList, sampleList)): pMat = pMat.data resAA = np.empty((S, nAs, S, nAs), dtype = np.complex) resAA[: Sold, :, : Sold, :] = self.trainedModel.data.resAA for i in range(nAs): MAi = dot(As[i], pMat) resAA[: Sold, i, Sold :, i] = self.HFEngine.innerProduct( MAi[:, Sold :], MAi[:, : Sold], dual = True) resAA[Sold :, i, : Sold, i] = resAA[: Sold, i, Sold :, i].T.conj() resAA[Sold :, i, Sold :, i] = self.HFEngine.innerProduct( MAi[:, Sold :], MAi[:, Sold :], dual = True) for j in range(i): MAj = dot(As[j], pMat) resAA[: Sold, i, Sold :, j] = ( self.HFEngine.innerProduct(MAj[:, Sold :], MAi[:, : Sold], dual = True)) resAA[Sold :, i, : Sold, j] = ( self.HFEngine.innerProduct(MAj[:, : Sold], MAi[:, Sold :], dual = True)) resAA[Sold :, i, Sold :, j] = ( self.HFEngine.innerProduct(MAj[:, Sold :], MAi[:, Sold :], dual = True)) for i in range(nAs): for j in range(i + 1, nAs): resAA[: Sold, i, Sold :, j] = ( resAA[Sold :, j, : Sold, i].T.conj()) resAA[Sold :, i, : Sold, j] = ( resAA[: Sold, j, Sold :, i].T.conj()) resAA[Sold :, i, Sold :, j] = ( resAA[Sold :, j, Sold :, i].T.conj()) self.trainedModel.data.resAA = resAA def assembleReducedResidualBlocks(self, full : bool = False): """Build affine blocks of affine decomposition of residual.""" if full: checkIfAffine(self.HFEngine, "assemble reduced residual blocks", False, self._affine_lvl) else: checkIfAffine(self.HFEngine, "assemble reduced RHS blocks", True, self._affine_lvl) self.HFEngine.buildb() self.assembleReducedResidualBlocksbb(self.HFEngine.bs) if full: pMat = self.samplingEngine.projectionMatrix self.HFEngine.buildA() self.assembleReducedResidualBlocksAb(self.HFEngine.As, self.HFEngine.bs, pMat) self.assembleReducedResidualBlocksAA(self.HFEngine.As, pMat) diff --git a/rrompy/reduction_methods/standard/greedy/rational_interpolant_greedy.py b/rrompy/reduction_methods/standard/greedy/rational_interpolant_greedy.py index 4d7356c..1a8af24 100644 --- a/rrompy/reduction_methods/standard/greedy/rational_interpolant_greedy.py +++ b/rrompy/reduction_methods/standard/greedy/rational_interpolant_greedy.py @@ -1,500 +1,503 @@ # Copyright (C) 2018-2020 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from copy import deepcopy as copy import numpy as np from rrompy.hfengines.base.linear_affine_engine import checkIfAffine from .generic_greedy_approximant import GenericGreedyApproximant from rrompy.utilities.poly_fitting.polynomial import (polybases, polyfitname, PolynomialInterpolator as PI, polyvander) from rrompy.utilities.numerical import dot from rrompy.utilities.numerical.degree import totalDegreeN from rrompy.utilities.expression import expressionEvaluator from rrompy.reduction_methods.standard import RationalInterpolant from rrompy.utilities.base.types import Np1D, Tuple, paramVal, List from rrompy.utilities.base.verbosity_depth import verbosityManager as vbMng from rrompy.utilities.poly_fitting import customFit from rrompy.utilities.exception_manager import (RROMPyWarning, RROMPyException, RROMPyAssert, RROMPy_FRAGILE) from rrompy.sampling import sampleList, emptySampleList __all__ = ['RationalInterpolantGreedy'] class RationalInterpolantGreedy(GenericGreedyApproximant, RationalInterpolant): """ ROM greedy rational interpolant computation for parametric problems. Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': kind of snapshots orthogonalization; allowed values include 0, 1/2, and 1; defaults to 1, i.e. POD; - 'scaleFactorDer': scaling factors for derivative computation; defaults to 'AUTO'; - 'S': number of starting training points; - 'sampler': sample point generator; - 'greedyTol': uniform error tolerance for greedy algorithm; defaults to 1e-2; - 'collinearityTol': collinearity tolerance for greedy algorithm; defaults to 0.; - 'maxIter': maximum number of greedy steps; defaults to 1e2; - 'nTestPoints': number of test points; defaults to 5e2; - 'samplerTrainSet': training sample points generator; defaults to sampler; - 'polybasis': type of basis for interpolation; defaults to 'MONOMIAL'; - 'errorEstimatorKind': kind of error estimator; available values include 'AFFINE', 'DISCREPANCY', 'LOOK_AHEAD', 'LOOK_AHEAD_RES', 'LOOK_AHEAD_OUTPUT', and 'NONE'; defaults to 'NONE'; - 'functionalSolve': strategy for minimization of denominator functional; allowed values include 'NORM', 'DOMINANT', 'BARYCENTRIC[_NORM]', and 'BARYCENTRIC_AVERAGE' (check pdf in main folder for explanation); defaults to 'NORM'; - 'interpTol': tolerance for interpolation; defaults to None; - 'QTol': tolerance for robust rational denominator management; defaults to 0. Defaults to empty dict. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. mu0: Default parameter. mus: Array of snapshot parameters. approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList. parameterListSoft: Recognized keys of soft approximant parameters: - 'POD': kind of snapshots orthogonalization; - 'scaleFactorDer': scaling factors for derivative computation; - 'greedyTol': uniform error tolerance for greedy algorithm; - 'collinearityTol': collinearity tolerance for greedy algorithm; - 'maxIter': maximum number of greedy steps; - 'nTestPoints': number of test points; - 'samplerTrainSet': training sample points generator; - 'errorEstimatorKind': kind of error estimator; - 'functionalSolve': strategy for minimization of denominator functional; - 'interpTol': tolerance for interpolation; - 'QTol': tolerance for robust rational denominator management. parameterListCritical: Recognized keys of critical approximant parameters: - 'S': total number of samples current approximant relies upon; - 'sampler': sample point generator. verbosity: Verbosity level. POD: Kind of snapshots orthogonalization. scaleFactorDer: Scaling factors for derivative computation. S: number of test points. sampler: Sample point generator. greedyTol: uniform error tolerance for greedy algorithm. collinearityTol: Collinearity tolerance for greedy algorithm. maxIter: maximum number of greedy steps. nTestPoints: number of starting training points. samplerTrainSet: training sample points generator. errorEstimatorKind: kind of error estimator. functionalSolve: Strategy for minimization of denominator functional. interpTol: tolerance for interpolation. QTol: tolerance for robust rational denominator management. muBounds: list of bounds for parameter values. samplingEngine: Sampling engine. uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as sampleList. lastSolvedHF: Parameter(s) corresponding to last computed high fidelity solution(s) as parameterList. uApproxReduced: Reduced approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApproxReduced: Parameter(s) corresponding to last computed reduced approximate solution(s) as parameterList. uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApprox: Parameter(s) corresponding to last computed approximate solution(s) as parameterList. """ _allowedEstimatorKinds = ["AFFINE", "DISCREPANCY", "LOOK_AHEAD", "LOOK_AHEAD_RES", "LOOK_AHEAD_OUTPUT", "NONE"] def __init__(self, *args, **kwargs): self._preInit() self._addParametersToList(["errorEstimatorKind"], ["DISCREPANCY"], toBeExcluded = ["M", "N", "polydegreetype", "radialDirectionalWeights"]) super().__init__(*args, **kwargs) self._postInit() @property def E(self): """Value of E.""" self._E = self.sampleBatchIdx - 1 return self._E @E.setter def E(self, E): RROMPyWarning(("E is used just to simplify inheritance, and its value " "cannot be changed from that of sampleBatchIdx - 1.")) def _setMAuto(self): self.M = self.E def _setNAuto(self): self.N = self.E @property def polydegreetype(self): """Value of polydegreetype.""" return "TOTAL" @polydegreetype.setter def polydegreetype(self, polydegreetype): RROMPyWarning(("polydegreetype is used just to simplify inheritance, " "and its value cannot be changed from 'TOTAL'.")) @property def polybasis(self): """Value of polybasis.""" return self._polybasis @polybasis.setter def polybasis(self, polybasis): try: polybasis = polybasis.upper().strip().replace(" ","") if polybasis not in polybases: raise RROMPyException("Sample type not recognized.") self._polybasis = polybasis except: RROMPyWarning(("Prescribed polybasis not recognized. Overriding " "to 'MONOMIAL'.")) self._polybasis = "MONOMIAL" self._approxParameters["polybasis"] = self.polybasis @property def errorEstimatorKind(self): """Value of errorEstimatorKind.""" return self._errorEstimatorKind @errorEstimatorKind.setter def errorEstimatorKind(self, errorEstimatorKind): errorEstimatorKind = errorEstimatorKind.upper() if errorEstimatorKind not in self._allowedEstimatorKinds: RROMPyWarning(("Error estimator kind not recognized. Overriding " "to 'NONE'.")) errorEstimatorKind = "NONE" self._errorEstimatorKind = errorEstimatorKind self._approxParameters["errorEstimatorKind"] = self.errorEstimatorKind def _polyvanderAuxiliary(self, mus, deg, *args): return polyvander(mus, deg, *args) def getErrorEstimatorDiscrepancy(self, mus:Np1D) -> Np1D: """Discrepancy-based residual estimator.""" checkIfAffine(self.HFEngine, "apply discrepancy-based error estimator", False, self._affine_lvl) mus = self.checkParameterList(mus) muCTest = self.trainedModel.centerNormalize(mus) tMverb, self.trainedModel.verbosity = self.trainedModel.verbosity, 0 QTest = self.trainedModel.getQVal(mus) QTzero = np.where(QTest == 0.)[0] if len(QTzero) > 0: RROMPyWarning(("Adjusting estimator to avoid division by " "numerically zero denominator.")) QTest[QTzero] = np.finfo(np.complex).eps / (1. + self.N) self.HFEngine.buildA() self.HFEngine.buildb() nAs, nbs = self.HFEngine.nAs, self.HFEngine.nbs muTrainEff = self.mapParameterList(self.mus) muTestEff = self.mapParameterList(mus) PTrain = self.trainedModel.getPVal(self.mus).data.T QTrain = self.trainedModel.getQVal(self.mus) QTzero = np.where(QTrain == 0.)[0] if len(QTzero) > 0: RROMPyWarning(("Adjusting estimator to avoid division by " "numerically zero denominator.")) QTrain[QTzero] = np.finfo(np.complex).eps / (1. + self.N) PTest = self.trainedModel.getPVal(mus).data self.trainedModel.verbosity = tMverb radiusAbTrain = np.empty((self.S, nAs * self.S + nbs), dtype = np.complex) radiusA = np.empty((self.S, nAs, len(mus)), dtype = np.complex) radiusb = np.empty((nbs, len(mus)), dtype = np.complex) for j, thA in enumerate(self.HFEngine.thAs): idxs = j * self.S + np.arange(self.S) radiusAbTrain[:, idxs] = expressionEvaluator(thA[0], muTrainEff, (self.S, 1)) * PTrain radiusA[:, j] = PTest * expressionEvaluator(thA[0], muTestEff, (len(mus),)) for j, thb in enumerate(self.HFEngine.thbs): idx = nAs * self.S + j radiusAbTrain[:, idx] = QTrain * expressionEvaluator(thb[0], muTrainEff, (self.S,)) radiusb[j] = QTest * expressionEvaluator(thb[0], muTestEff, (len(mus),)) QRHSNorm2 = self._affineResidualMatricesContraction(radiusb) vanTrain = self._polyvanderAuxiliary(self._musUniqueCN, self.E, self.polybasis0, self._derIdxs, self._reorder) interpPQ = customFit(vanTrain, radiusAbTrain, rcond = self.interpTol) vanTest = self._polyvanderAuxiliary(muCTest, self.E, self.polybasis0) DradiusAb = vanTest.dot(interpPQ) radiusA = (radiusA - DradiusAb[:, : - nbs].reshape(len(mus), -1, self.S).T) radiusb = radiusb - DradiusAb[:, - nbs :].T ff, Lf, LL = self._affineResidualMatricesContraction(radiusb, radiusA) err = np.abs((LL - 2. * np.real(Lf) + ff) / QRHSNorm2) ** .5 return err def getErrorEstimatorLookAhead(self, mus:Np1D, what : str = "") -> Tuple[Np1D, List[int]]: """Residual estimator based on look-ahead idea.""" errTest, QTest, idxMaxEst = self._EIMStep(mus) mu_muTestS = mus[idxMaxEst] - app_muTestSample = self.getApproxReduced(mu_muTestS) + app_muTestSample = self.trainedModel.getApproxReduced(mu_muTestS) if self._mode == RROMPy_FRAGILE: if what == "RES" and not self.HFEngine.isCEye: raise RROMPyException(("Cannot compute LOOK_AHEAD_RES " "estimator in fragile mode for " "non-scalar C.")) app_muTestSample = dot(self.trainedModel.data.projMat[:, : app_muTestSample.shape[0]], app_muTestSample) else: app_muTestSample = dot(self.samplingEngine.projectionMatrix, app_muTestSample) app_muTestSample = sampleList(app_muTestSample) if what == "RES": errmu = self.HFEngine.residual(mu_muTestS, app_muTestSample, post_c = False) solmu = self.HFEngine.residual(mu_muTestS, None, post_c = False) normSol = self.HFEngine.norm(solmu, dual = True) normErr = self.HFEngine.norm(errmu, dual = True) else: for j, mu in enumerate(mu_muTestS): uEx = self.samplingEngine.nextSample(mu) if what == "OUTPUT": uEx = self.HFEngine.applyC(uEx, mu) app_muTS = self.HFEngine.applyC(app_muTestSample[j], mu) if j == 0: app_muTestS = emptySampleList() app_muTestS.reset((len(app_muTS), len(mu_muTestS)), dtype = app_muTS.dtype) app_muTestS[j] = app_muTS if j == 0: solmu = emptySampleList() solmu.reset((len(uEx), len(mu_muTestS)), dtype = uEx.dtype) solmu[j] = uEx if what == "OUTPUT": app_muTestSample = app_muTestS errmu = solmu - app_muTestSample normSol = self.HFEngine.norm(solmu, is_state = what != "OUTPUT") normErr = self.HFEngine.norm(errmu, is_state = what != "OUTPUT") errsamples = normErr / normSol musT = copy(self.mus) musT.append(mu_muTestS) musT = self.trainedModel.centerNormalize(musT) musC = self.trainedModel.centerNormalize(mus) errT = np.zeros((len(musT), len(mu_muTestS)), dtype = np.complex) errT[np.arange(len(self.mus), len(musT)), np.arange(len(mu_muTestS))] = errsamples * QTest[idxMaxEst] vanT = self._polyvanderAuxiliary(musT, self.E + 1, self.polybasis) fitOut = customFit(vanT, errT, full = True, rcond = self.interpTol) vbMng(self, "MAIN", ("Fitting {} samples with degree {} through {}... Conditioning " "of LS system: {:.4e}.").format(len(vanT), self.E + 1, polyfitname(self.polybasis), fitOut[1][2][0] / fitOut[1][2][-1]), 15) vanC = self._polyvanderAuxiliary(musC, self.E + 1, self.polybasis) err = np.sum(np.abs(vanC.dot(fitOut[0])), axis = -1) / QTest return err, idxMaxEst def getErrorEstimatorNone(self, mus:Np1D) -> Np1D: """EIM-based residual estimator.""" err = np.max(self._EIMStep(mus, True), axis = 1) err *= self.greedyTol / np.mean(err) return err def _EIMStep(self, mus:Np1D, only_one : bool = False) -> Tuple[Np1D, Np1D, List[int]]: """Residual estimator based on look-ahead idea.""" mus = self.checkParameterList(mus) tMverb, self.trainedModel.verbosity = self.trainedModel.verbosity, 0 QTest = self.trainedModel.getQVal(mus) QTzero = np.where(QTest == 0.)[0] if len(QTzero) > 0: RROMPyWarning(("Adjusting estimator to avoid division by " "numerically zero denominator.")) QTest[QTzero] = np.finfo(np.complex).eps / (1. + self.N) QTest = np.abs(QTest) muCTest = self.trainedModel.centerNormalize(mus) muCTrain = self.trainedModel.centerNormalize(self.mus) self.trainedModel.verbosity = tMverb vanTest = self._polyvanderAuxiliary(muCTest, self.E, self.polybasis) vanTestNext = self._polyvanderAuxiliary(muCTest, self.E + 1, self.polybasis)[:, vanTest.shape[1] :] idxsTest = np.arange(vanTestNext.shape[1]) basis = np.zeros((len(idxsTest), 0), dtype = float) idxMaxEst = [] while len(idxsTest) > 0: vanTrial = self._polyvanderAuxiliary(muCTrain, self.E, self.polybasis) vanTrialNext = self._polyvanderAuxiliary(muCTrain, self.E + 1, self.polybasis)[:, vanTrial.shape[1] :] vanTrial = np.hstack((vanTrial, vanTrialNext.dot(basis).reshape( len(vanTrialNext), basis.shape[1]))) valuesTrial = vanTrialNext[:, idxsTest] vanTestEff = np.hstack((vanTest, vanTestNext.dot(basis).reshape( len(vanTestNext), basis.shape[1]))) vanTestNextEff = vanTestNext[:, idxsTest] coeffTest = np.linalg.solve(vanTrial, valuesTrial) errTest = (np.abs(vanTestNextEff - vanTestEff.dot(coeffTest)) / np.expand_dims(QTest, 1)) if only_one: return errTest idxMaxErr = np.unravel_index(np.argmax(errTest), errTest.shape) idxMaxEst += [idxMaxErr[0]] muCTrain.append(muCTest[idxMaxErr[0]]) basis = np.pad(basis, [(0, 0), (0, 1)], "constant") basis[idxsTest[idxMaxErr[1]], -1] = 1. idxsTest = np.delete(idxsTest, idxMaxErr[1]) return errTest, QTest, idxMaxEst def errorEstimator(self, mus:Np1D, return_max : bool = False) -> Np1D: """Standard residual-based error estimator.""" setupOK = self.setupApproxLocal() if setupOK > 0: err = np.empty(len(mus)) err[:] = np.nan if not return_max: return err return err, [- setupOK], np.nan mus = self.checkParameterList(mus) vbMng(self.trainedModel, "INIT", "Evaluating error estimator at mu = {}.".format(mus), 10) if self.errorEstimatorKind == "AFFINE": err = self.getErrorEstimatorAffine(mus) else: self._setupInterpolationIndices() if self.errorEstimatorKind == "DISCREPANCY": err = self.getErrorEstimatorDiscrepancy(mus) elif self.errorEstimatorKind[: 10] == "LOOK_AHEAD": err, idxMaxEst = self.getErrorEstimatorLookAhead(mus, self.errorEstimatorKind[11 :]) else: #if self.errorEstimatorKind == "NONE": err = self.getErrorEstimatorNone(mus) vbMng(self.trainedModel, "DEL", "Done evaluating error estimator.", 10) if not return_max: return err if self.errorEstimatorKind[: 10] != "LOOK_AHEAD": idxMaxEst = np.empty(self.sampleBatchSize, dtype = int) errCP = copy(err) for j in range(self.sampleBatchSize): k = np.argmax(errCP) idxMaxEst[j] = k if j + 1 < self.sampleBatchSize: musZero = self.trainedModel.centerNormalize(mus, mus[k]) errCP *= np.linalg.norm(musZero.data, axis = 1) return err, idxMaxEst, err[idxMaxEst] + _warnPlottingNormalization = 1 def plotEstimator(self, *args, **kwargs): super().plotEstimator(*args, **kwargs) - if self.errorEstimatorKind == "NONE": - vbMng(self, "MAIN", - ("Warning! Error estimator has been arbitrarily normalized " - "before plotting."), 15) + if (self.errorEstimatorKind == "NONE" + and self._warnPlottingNormalization): + RROMPyWarning(("Error estimator arbitrarily normalized before " + "plotting.")) + self._warnPlottingNormalization = 0 def greedyNextSample(self, *args, **kwargs) -> Tuple[Np1D, int, float, paramVal]: """Compute next greedy snapshot of solution map.""" RROMPyAssert(self._mode, message = "Cannot add greedy sample.") self.sampleBatchIdx += 1 self.sampleBatchSize = totalDegreeN(self.npar - 1, self.sampleBatchIdx) err, muidx, maxErr, muNext = super().greedyNextSample(*args, **kwargs) if maxErr is not None and (np.any(np.isnan(maxErr)) or np.any(np.isinf(maxErr))): self.sampleBatchIdx -= 1 self.sampleBatchSize = totalDegreeN(self.npar - 1, self.sampleBatchIdx) if (self.errorEstimatorKind == "NONE" and not np.isnan(maxErr) and not np.isinf(maxErr)): maxErr = None return err, muidx, maxErr, muNext def _setSampleBatch(self, maxS:int): self.sampleBatchIdx, self.sampleBatchSize, S = -1, 0, 0 nextBatchSize = 1 while S + nextBatchSize <= maxS: self.sampleBatchIdx += 1 self.sampleBatchSize = nextBatchSize S += self.sampleBatchSize nextBatchSize = totalDegreeN(self.npar - 1, self.sampleBatchIdx + 1) return S def _preliminaryTraining(self): """Initialize starting snapshots of solution map.""" RROMPyAssert(self._mode, message = "Cannot start greedy algorithm.") if self.samplingEngine.nsamples > 0: return self._S = self._setSampleBatch(self.S) super()._preliminaryTraining() self.M, self.N = ("AUTO",) * 2 def setupApproxLocal(self) -> int: """Compute rational interpolant.""" if self.checkComputedApprox(): return -1 RROMPyAssert(self._mode, message = "Cannot setup approximant.") self.verbosity -= 10 vbMng(self, "INIT", "Setting up local approximant.", 5) pMat = self.samplingEngine.projectionMatrix firstRun = self.trainedModel is None if not firstRun: pMat = pMat[:, len(self.trainedModel.data.mus) :] self._setupTrainedModel(pMat, not firstRun) unstable = 0 if self.E > 0: Q = self._setupDenominator() else: Q = PI() Q.coeffs = np.ones((1,) * self.npar, dtype = np.complex) Q.npar = self.npar Q.polybasis = self.polybasis if not unstable: self._setupRational(Q) if self.M < self.E: RROMPyWarning(("Instability in numerator computation. " "Aborting.")) unstable = 1 if not unstable: self.trainedModel.data.approxParameters = copy( self.approxParameters) vbMng(self, "DEL", "Done setting up local approximant.", 5) self.verbosity += 10 return unstable def setupApprox(self, plotEst : str = "NONE") -> int: + self._postGreedyRecover = 0 val = super().setupApprox(plotEst) + self._postGreedyRecover = 1 if val == 0: - if (self.errorEstimatorKind[:10] == "LOOK_AHEAD" - and len(self.mus) < self.samplingEngine.nsamples): + if len(self.mus) < self.samplingEngine.nsamples: while len(self.mus) < self.samplingEngine.nsamples: self.mus.append(self.samplingEngine.mus[len(self.mus)]) self.trainedModel = None - self._S = self._setSampleBatch(len(self.mus) + 1) + self._S = self._setSampleBatch(len(self.mus) + 1) - 1 self.setupApproxLocal() self._setupRational(self.trainedModel.data.Q, self.trainedModel.data.P) self.trainedModel.data.approxParameters = copy( self.approxParameters) return val def loadTrainedModel(self, filename:str): """Load trained reduced model from file.""" super().loadTrainedModel(filename) self._setSampleBatch(self.S + 1) diff --git a/rrompy/reduction_methods/standard/rational_interpolant.py b/rrompy/reduction_methods/standard/rational_interpolant.py index a72e39d..9516544 100644 --- a/rrompy/reduction_methods/standard/rational_interpolant.py +++ b/rrompy/reduction_methods/standard/rational_interpolant.py @@ -1,707 +1,721 @@ # Copyright (C) 2018-2020 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from copy import deepcopy as copy import numpy as np from scipy.linalg import eig from collections.abc import Iterable from .generic_standard_approximant import GenericStandardApproximant from rrompy.utilities.poly_fitting.polynomial import ( polybases as ppb, polyfitname, polyvander as pvP, polyTimes, PolynomialInterpolator as PI, PolynomialInterpolatorNodal as PIN) from rrompy.utilities.poly_fitting.heaviside import rational2heaviside from rrompy.utilities.poly_fitting.radial_basis import (polybases as rbpb, RadialBasisInterpolator as RBI) from rrompy.utilities.base.types import (Np1D, Np2D, Tuple, List, paramList, interpEng) from rrompy.utilities.base import verbosityManager as vbMng from rrompy.utilities.numerical import pseudoInverse, dot, baseDistanceMatrix from rrompy.utilities.numerical.factorials import multifactorial from rrompy.utilities.numerical.hash_derivative import (nextDerivativeIndices, hashDerivativeToIdx as hashD, hashIdxToDerivative as hashI) from rrompy.utilities.numerical.degree import (reduceDegreeN, degreeTotalToFull, fullDegreeMaxMask, totalDegreeMaxMask) from rrompy.utilities.exception_manager import (RROMPyException, RROMPyAssert, RROMPyWarning) __all__ = ['RationalInterpolant'] def polyTimesTable(P:interpEng, mus:Np1D, reorder:List[int], derIdxs:List[List[List[int]]], scl : Np1D = None) -> Np2D: """Table of polynomial products.""" if not isinstance(P, PI): raise RROMPyException(("Polynomial to evaluate must be a polynomial " "interpolator.")) Pvals = [[0.] * len(derIdx) for derIdx in derIdxs] for j, derIdx in enumerate(derIdxs): nder = len(derIdx) for der in range(nder): derI = hashI(der, P.npar) Pvals[j][der] = P([mus[j]], derI, scl) / multifactorial(derI) return blockDiagDer(Pvals, reorder, derIdxs) def vanderInvTable(vanInv:Np2D, idxs:List[int], reorder:List[int], derIdxs:List[List[List[int]]]) -> Np2D: """Table of Vandermonde pseudo-inverse.""" S = len(reorder) Ts = [None] * len(idxs) for k in range(len(idxs)): invLocs = [None] * len(derIdxs) idxGlob = 0 for j, derIdx in enumerate(derIdxs): nder = len(derIdx) idxGlob += nder idxLoc = np.arange(S)[(reorder >= idxGlob - nder) * (reorder < idxGlob)] invLocs[j] = vanInv[k, idxLoc] Ts[k] = blockDiagDer(invLocs, reorder, derIdxs, [2, 1, 0]) return Ts def blockDiagDer(vals:List[Np1D], reorder:List[int], derIdxs:List[List[List[int]]], permute : List[int] = None) -> Np2D: """Table of derivative values for point confluence.""" S = len(reorder) T = np.zeros((S, S), dtype = np.complex) if permute is None: permute = [0, 1, 2] idxGlob = 0 for j, derIdx in enumerate(derIdxs): nder = len(derIdx) idxGlob += nder idxLoc = np.arange(S)[(reorder >= idxGlob - nder) * (reorder < idxGlob)] val = vals[j] for derI, derIdxI in enumerate(derIdx): for derJ, derIdxJ in enumerate(derIdx): diffIdx = [x - y for (x, y) in zip(derIdxI, derIdxJ)] if all([x >= 0 for x in diffIdx]): diffj = hashD(diffIdx) i1, i2, i3 = np.array([derI, derJ, diffj])[permute] T[idxLoc[i1], idxLoc[i2]] = val[i3] return T class RationalInterpolant(GenericStandardApproximant): """ ROM rational interpolant computation for parametric problems. Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': kind of snapshots orthogonalization; allowed values include 0, 1/2, and 1; defaults to 1, i.e. POD; - 'scaleFactorDer': scaling factors for derivative computation; defaults to 'AUTO'; - 'S': total number of samples current approximant relies upon; - 'sampler': sample point generator; - 'polybasis': type of polynomial basis for interpolation; defaults to 'MONOMIAL'; - 'M': degree of rational interpolant numerator; defaults to 'AUTO', i.e. maximum allowed; - 'N': degree of rational interpolant denominator; defaults to 'AUTO', i.e. maximum allowed; - 'polydegreetype': type of polynomial degree; defaults to 'TOTAL'; - 'radialDirectionalWeights': radial basis weights for interpolant numerator; defaults to 1; - 'radialDirectionalWeightsAdapt': bounds for adaptive rescaling of radial basis weights; defaults to [-1, -1]; - 'functionalSolve': strategy for minimization of denominator functional; allowed values include 'NORM', 'DOMINANT', 'BARYCENTRIC[_NORM]', and 'BARYCENTRIC_AVERAGE' (check pdf in main folder for explanation); defaults to 'NORM'; - 'interpTol': tolerance for interpolation; defaults to None; - 'QTol': tolerance for robust rational denominator management; defaults to 0. Defaults to empty dict. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. mu0: Default parameter. mus: Array of snapshot parameters. approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList. parameterListSoft: Recognized keys of soft approximant parameters: - 'POD': kind of snapshots orthogonalization; - 'scaleFactorDer': scaling factors for derivative computation; - 'polybasis': type of polynomial basis for interpolation; - 'M': degree of rational interpolant numerator; - 'N': degree of rational interpolant denominator; - 'polydegreetype': type of polynomial degree; - 'radialDirectionalWeights': radial basis weights for interpolant numerator; - 'radialDirectionalWeightsAdapt': bounds for adaptive rescaling of radial basis weights; - 'functionalSolve': strategy for minimization of denominator functional; - 'interpTol': tolerance for interpolation via numpy.polyfit; - 'QTol': tolerance for robust rational denominator management. parameterListCritical: Recognized keys of critical approximant parameters: - 'S': total number of samples current approximant relies upon; - 'sampler': sample point generator. verbosity: Verbosity level. POD: Kind of snapshots orthogonalization. scaleFactorDer: Scaling factors for derivative computation. S: Number of solution snapshots over which current approximant is based upon. sampler: Sample point generator. polybasis: type of polynomial basis for interpolation. M: Numerator degree of approximant. N: Denominator degree of approximant. polydegreetype: Type of polynomial degree. radialDirectionalWeights: Radial basis weights for interpolant numerator. radialDirectionalWeightsAdapt: Bounds for adaptive rescaling of radial basis weights. functionalSolve: Strategy for minimization of denominator functional. interpTol: Tolerance for interpolation via numpy.polyfit. QTol: Tolerance for robust rational denominator management. muBounds: list of bounds for parameter values. samplingEngine: Sampling engine. uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as sampleList. lastSolvedHF: Parameter(s) corresponding to last computed high fidelity solution(s) as parameterList. uApproxReduced: Reduced approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApproxReduced: Parameter(s) corresponding to last computed reduced approximate solution(s) as parameterList. uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApprox: Parameter(s) corresponding to last computed approximate solution(s) as parameterList. Q: Numpy 1D vector containing complex coefficients of approximant denominator. P: Numpy 2D vector whose columns are FE dofs of coefficients of approximant numerator. """ _allowedFunctionalSolveKinds = ["NORM", "DOMINANT", "BARYCENTRIC_NORM", "BARYCENTRIC_AVERAGE"] def __init__(self, *args, **kwargs): self._preInit() self._addParametersToList(["polybasis", "M", "N", "polydegreetype", "radialDirectionalWeights", "radialDirectionalWeightsAdapt", "functionalSolve", "interpTol", "QTol"], ["MONOMIAL", "AUTO", "AUTO", "TOTAL", 1., [-1., -1.], "NORM", -1, 0.]) super().__init__(*args, **kwargs) self._postInit() @property def tModelType(self): from .trained_model.trained_model_rational import TrainedModelRational return TrainedModelRational @property def polybasis(self): """Value of polybasis.""" return self._polybasis @polybasis.setter def polybasis(self, polybasis): try: polybasis = polybasis.upper().strip().replace(" ","") if polybasis not in ppb + rbpb: raise RROMPyException("Prescribed polybasis not recognized.") self._polybasis = polybasis except: RROMPyWarning(("Prescribed polybasis not recognized. Overriding " "to 'MONOMIAL'.")) self._polybasis = "MONOMIAL" self._approxParameters["polybasis"] = self.polybasis @property def polybasis0(self): if "_" in self.polybasis: return self.polybasis.split("_")[0] return self.polybasis @property def functionalSolve(self): """Value of functionalSolve.""" return self._functionalSolve @functionalSolve.setter def functionalSolve(self, functionalSolve): try: functionalSolve = functionalSolve.upper().strip().replace(" ","") if functionalSolve == "BARYCENTRIC": functionalSolve += "_NORM" if functionalSolve not in self._allowedFunctionalSolveKinds: raise RROMPyException(("Prescribed functionalSolve not " "recognized.")) self._functionalSolve = functionalSolve except: RROMPyWarning(("Prescribed functionalSolve not recognized. " "Overriding to 'NORM'.")) self._functionalSolve = "NORM" self._approxParameters["functionalSolve"] = self.functionalSolve @property def interpTol(self): """Value of interpTol.""" return self._interpTol @interpTol.setter def interpTol(self, interpTol): self._interpTol = interpTol self._approxParameters["interpTol"] = self.interpTol @property def radialDirectionalWeights(self): """Value of radialDirectionalWeights.""" return self._radialDirectionalWeights @radialDirectionalWeights.setter def radialDirectionalWeights(self, radialDirectionalWeights): if isinstance(radialDirectionalWeights, Iterable): radialDirectionalWeights = list(radialDirectionalWeights) else: radialDirectionalWeights = [radialDirectionalWeights] self._radialDirectionalWeights = radialDirectionalWeights self._approxParameters["radialDirectionalWeights"] = ( self.radialDirectionalWeights) @property def radialDirectionalWeightsAdapt(self): """Value of radialDirectionalWeightsAdapt.""" return self._radialDirectionalWeightsAdapt @radialDirectionalWeightsAdapt.setter def radialDirectionalWeightsAdapt(self, radialDirectionalWeightsAdapt): self._radialDirectionalWeightsAdapt = radialDirectionalWeightsAdapt self._approxParameters["radialDirectionalWeightsAdapt"] = ( self.radialDirectionalWeightsAdapt) @property def M(self): """Value of M.""" return self._M @M.setter def M(self, M): if isinstance(M, str): M = M.strip().replace(" ","") if "-" not in M: M = M + "-0" self._M_isauto, self._M_shift = True, int(M.split("-")[-1]) M = 0 if M < 0: raise RROMPyException("M must be non-negative.") self._M = M self._approxParameters["M"] = self.M def _setMAuto(self): self.M = max(0, reduceDegreeN(self.S, self.S, self.npar, self.polydegreetype) - self._M_shift) vbMng(self, "MAIN", "Automatically setting M to {}.".format(self.M), 25) @property def N(self): """Value of N.""" return self._N @N.setter def N(self, N): if isinstance(N, str): N = N.strip().replace(" ","") if "-" not in N: N = N + "-0" self._N_isauto, self._N_shift = True, int(N.split("-")[-1]) N = 0 if N < 0: raise RROMPyException("N must be non-negative.") self._N = N self._approxParameters["N"] = self.N def _setNAuto(self): self.N = max(0, reduceDegreeN(self.S, self.S, self.npar, self.polydegreetype) - self._N_shift) vbMng(self, "MAIN", "Automatically setting N to {}.".format(self.N), 25) @property def polydegreetype(self): """Value of polydegreetype.""" return self._polydegreetype @polydegreetype.setter def polydegreetype(self, polydegreetype): try: polydegreetype = polydegreetype.upper().strip().replace(" ","") if polydegreetype not in ["TOTAL", "FULL"]: raise RROMPyException(("Prescribed polydegreetype not " "recognized.")) self._polydegreetype = polydegreetype except: RROMPyWarning(("Prescribed polydegreetype not recognized. " "Overriding to 'TOTAL'.")) self._polydegreetype = "TOTAL" self._approxParameters["polydegreetype"] = self.polydegreetype @property def QTol(self): """Value of tolerance for robust rational denominator management.""" return self._QTol @QTol.setter def QTol(self, QTol): if QTol < 0.: RROMPyWarning(("Overriding prescribed negative robustness " "tolerance to 0.")) QTol = 0. self._QTol = QTol self._approxParameters["QTol"] = self.QTol def resetSamples(self): """Reset samples.""" super().resetSamples() self._musUniqueCN = None self._derIdxs = None self._reorder = None def _setupInterpolationIndices(self): """Setup parameters for polyvander.""" if self._musUniqueCN is None or len(self._reorder) != len(self.mus): self._musUniqueCN, musIdxsTo, musIdxs, musCount = ( self.trainedModel.centerNormalize(self.mus).unique( return_index = True, return_inverse = True, return_counts = True)) self._musUnique = self.mus[musIdxsTo] self._derIdxs = [None] * len(self._musUniqueCN) self._reorder = np.empty(len(musIdxs), dtype = int) filled = 0 for j, cnt in enumerate(musCount): self._derIdxs[j] = nextDerivativeIndices([], self.mus.shape[1], cnt) jIdx = np.nonzero(musIdxs == j)[0] self._reorder[jIdx] = np.arange(filled, filled + cnt) filled += cnt def _setupDenominator(self): """Compute rational denominator.""" RROMPyAssert(self._mode, message = "Cannot setup denominator.") vbMng(self, "INIT", "Starting computation of denominator.", 7) if hasattr(self, "_N_isauto"): self._setNAuto() else: N = reduceDegreeN(self.N, self.S, self.npar, self.polydegreetype) if N < self.N: RROMPyWarning(("N too large compared to S. Reducing N by " "{}").format(self.N - N)) self.N = N while self.N > 0: if self.functionalSolve != "NORM" and self.npar > 1: RROMPyWarning(("Strategy for functional optimization must be " "'NORM' for more than one parameter. " "Overriding to 'NORM'.")) self.functionalSolve = "NORM" if (self.functionalSolve[:11] == "BARYCENTRIC" and self.N + 1 < self.S): RROMPyWarning(("Barycentric strategy cannot be applied with " "Least Squares. Overriding to 'NORM'.")) self.functionalSolve = "NORM" if self.functionalSolve[:11] == "BARYCENTRIC": invD, TN = None, None self._setupInterpolationIndices() if len(self._musUnique) != self.S: RROMPyWarning(("Barycentric functional optimization " "cannot be applied to repeated samples. " "Overriding to 'NORM'.")) self.functionalSolve = "NORM" if self.functionalSolve[:11] != "BARYCENTRIC": invD, TN = self._computeInterpolantInverseBlocks() if self.POD == 1: sampleE = self.samplingEngine.Rscale Rscaling = None elif self.POD == 1/2: sampleE = self.samplingEngine.samples_normal Rscaling = self.samplingEngine.Rscale else: sampleE = self.samplingEngine.samples Rscaling = None ev, eV = self.findeveVG(sampleE, invD, TN, Rscaling) - if self.functionalSolve[:11] == "BARYCENTRIC": - evBad = np.abs(ev) < self.QTol * np.linalg.norm(ev) - else: - evBad = np.abs(ev) < self.QTol * np.abs(ev[-1]) - nevBad = np.sum(evBad) + if self.functionalSolve[:11] == "BARYCENTRIC": break + nevBad = np.sum(np.abs(ev / ev[-1]) < self.QTol) if not nevBad: break if self.npar == 1: dN = nevBad else: #if self.npar > 1 and self.functionalSolve == "NORM": dN = self.N - reduceDegreeN(self.N, len(eV) - nevBad, self.npar, self.polydegreetype) vbMng(self, "MAIN", ("Smallest {} eigenvalue{} below tolerance. Reducing N by " "{}.").format(nevBad, "s" * (nevBad > 1), dN), 10) self.N = self.N - dN - if self.functionalSolve[:11] == "BARYCENTRIC": - eV = eV[evBad == False] - break if hasattr(self, "_gram"): del self._gram if self.N <= 0: self.N, eV = 0, np.ones((1,) * self.npar, dtype = np.complex) if self.N > 0 and self.functionalSolve[:11] == "BARYCENTRIC": q = PIN() q.polybasis, q.nodes = self.polybasis0, eV else: q = PI() q.npar, q.polybasis = self.npar, self.polybasis0 if self.polydegreetype == "TOTAL": q.coeffs = degreeTotalToFull(tuple([self.N + 1] * self.npar), self.npar, eV) else: q.coeffs = eV.reshape([self.N + 1] * self.npar) vbMng(self, "DEL", "Done computing denominator.", 7) return q def _setupNumerator(self): """Compute rational numerator.""" RROMPyAssert(self._mode, message = "Cannot setup numerator.") vbMng(self, "INIT", "Starting computation of numerator.", 7) self._setupInterpolationIndices() Qevaldiag = polyTimesTable(self.trainedModel.data.Q, self._musUniqueCN, self._reorder, self._derIdxs, self.scaleFactorRel) if self.POD == 1: Qevaldiag = Qevaldiag.dot(self.samplingEngine.Rscale.T) elif self.POD == 1/2: Qevaldiag = Qevaldiag * self.samplingEngine.Rscale if hasattr(self, "_M_isauto"): self._setMAuto() M = self.M else: M = reduceDegreeN(self.M, self.S, self.npar, self.polydegreetype) if M < self.M: RROMPyWarning(("M too large compared to S. Reducing M by " "{}").format(self.M - M)) self.M = M while self.M >= 0: pParRest = [self.M, self.polybasis, self.verbosity >= 5, self.polydegreetype == "TOTAL", {"derIdxs": self._derIdxs, "reorder": self._reorder, "scl": self.scaleFactorRel}] if self.polybasis in ppb: p = PI() else: self.computeScaleFactor() rDWEff = np.array([w * f for w, f in zip( self.radialDirectionalWeights, self.scaleFactor)]) pParRest = pParRest[: 2] + [rDWEff] + pParRest[2 :] pParRest[-1]["optimizeScalingBounds"] = ( self.radialDirectionalWeightsAdapt) p = RBI() if self.polybasis in ppb + rbpb: pParRest += [{"rcond": self.interpTol}] wellCond, msg = p.setupByInterpolation(self._musUniqueCN, Qevaldiag, *pParRest) vbMng(self, "MAIN", msg, 5) if wellCond: break vbMng(self, "MAIN", ("Polyfit is poorly conditioned. Reducing M " "by 1."), 10) self.M = self.M - 1 if self.M < 0: raise RROMPyException(("Instability in computation of numerator. " "Aborting.")) self.M = M vbMng(self, "DEL", "Done computing numerator.", 7) return p def setupApprox(self) -> int: """Compute rational interpolant.""" if self.checkComputedApprox(): return -1 RROMPyAssert(self._mode, message = "Cannot setup approximant.") vbMng(self, "INIT", "Setting up {}.". format(self.name()), 5) self.computeSnapshots() self._setupTrainedModel(self.samplingEngine.projectionMatrix) self._setupRational(self._setupDenominator()) self.trainedModel.data.approxParameters = copy(self.approxParameters) vbMng(self, "DEL", "Done setting up approximant.", 5) return 0 def _setupRational(self, Q:interpEng, P : interpEng = None): vbMng(self, "INIT", "Starting approximant finalization.", 5) self.trainedModel.data.Q = Q if P is None: P = self._setupNumerator() while self.N > 0 and self.npar == 1: if self.HFEngine._ignoreResidues: pls = Q.roots() cfs, projMat = None, None else: cfs, pls, _ = rational2heaviside(P, Q) cfs = cfs[: self.N].T if self.POD != 1: projMat = self.samplingEngine.projectionMatrix else: projMat = None foci = self.sampler.normalFoci() plsA = self.mapParameterList(self.mapParameterList(self.mu0)(0, 0) + self.scaleFactor * pls, "B")(0) idxBad = self.HFEngine.flagBadPolesResiduesAbsolute(plsA, cfs, projMat) if not self.HFEngine._ignoreResidues: cfs[:, idxBad] = 0. idxBad += self.HFEngine.flagBadPolesResiduesRelative(pls, cfs, projMat, foci) idxBad = idxBad > 0 if not np.any(idxBad): break vbMng(self, "MAIN", "Removing {} spurious pole{} out of {}.".format( np.sum(idxBad), "s" * (np.sum(idxBad) > 1), self.N), 10) if isinstance(Q, PIN): Q.nodes = Q.nodes[idxBad == False] else: Q = PI() Q.npar = self.npar Q.polybasis = self.polybasis0 Q.coeffs = np.ones(1, dtype = np.complex) for pl in pls[idxBad == False]: Q.coeffs = polyTimes(Q.coeffs, [- pl, 1.], Pbasis = Q.polybasis, Rbasis = Q.polybasis) Q.coeffs /= np.linalg.norm(Q.coeffs) self.trainedModel.data.Q = Q self.N = Q.deg[0] P = self._setupNumerator() self.trainedModel.data.P = P vbMng(self, "DEL", "Terminated approximant finalization.", 5) def _computeInterpolantInverseBlocks(self) -> Tuple[List[Np2D], Np2D]: """ Compute inverse factors for minimal interpolant target functional. """ RROMPyAssert(self._mode, message = "Cannot solve eigenvalue problem.") self._setupInterpolationIndices() pvPPar = [self.polybasis0, self._derIdxs, self._reorder, self.scaleFactorRel] full = self.N + 1 == self.S == len(self._musUniqueCN) if full: mus = self._musUniqueCN[self._reorder] dist = baseDistanceMatrix(mus, magnitude = False)[..., 0] dist[np.arange(self.N + 1), np.arange(self.N + 1)] = multifactorial([self.N]) fitinvE = np.prod(dist, axis = 1) ** -1 vbMng(self, "MAIN", ("Evaluating quasi-Lagrangian basis of degree {} at {} " "sample points.").format(self.N, self.N + 1), 5) invD = [np.diag(fitinvE)] TN = pvP(self._musUniqueCN, self.N, *pvPPar) else: while self.N >= 0: if self.polydegreetype == "TOTAL": Neff = self.N idxsB = totalDegreeMaxMask(self.N, self.npar) else: #if self.polydegreetype == "FULL": Neff = [self.N] * self.npar idxsB = fullDegreeMaxMask(self.N, self.npar) TN = pvP(self._musUniqueCN, Neff, *pvPPar) fitOut = pseudoInverse(TN, rcond = self.interpTol, full = True) vbMng(self, "MAIN", ("Fitting {} samples with degree {} through {}... " "Conditioning of pseudoinverse system: {:.4e}.").format( TN.shape[0], self.N, polyfitname(self.polybasis0), fitOut[1][1][0] / fitOut[1][1][-1]), 5) if fitOut[1][0] == TN.shape[1]: fitinv = fitOut[0][idxsB, :] break vbMng(self, "MAIN", "Polyfit is poorly conditioned. Reducing N by 1.", 10) self.N = self.N - 1 if self.N < 0: raise RROMPyException(("Instability in computation of " "denominator. Aborting.")) invD = vanderInvTable(fitinv, idxsB, self._reorder, self._derIdxs) return invD, TN def findeveVG(self, sampleE:Np2D, invD:List[Np2D], TN:Np2D, Rscaling : Np1D = None) -> Tuple[Np1D, Np2D]: """ Compute eigenvalues and eigenvectors of rational denominator matrix, or of its right chol factor if POD. """ RROMPyAssert(self._mode, message = "Cannot solve spectral problem.") if self.POD == 1: if self.functionalSolve[:11] == "BARYCENTRIC": Rstack = sampleE else: vbMng(self, "INIT", "Building generalized half-gramian.", 10) S, eWidth = sampleE.shape[0], len(invD) Rstack = np.zeros((S * eWidth, TN.shape[1]), dtype = np.complex) for k in range(eWidth): Rstack[k * S : (k + 1) * S, :] = dot(sampleE, dot(invD[k], TN)) vbMng(self, "DEL", "Done building half-gramian.", 10) _, s, Vh = np.linalg.svd(Rstack, full_matrices = False) - ev, eV = s[::-1], Vh[::-1].T.conj() + evG, eVG = s[::-1], Vh[::-1].T.conj() evExp, probKind = -2., "svd " else: if not hasattr(self, "_gram"): vbMng(self, "INIT", "Building gramian matrix.", 10) self._gram = self.HFEngine.innerProduct(sampleE, sampleE, is_state = True) if Rscaling is not None: self._gram = (self._gram.T * Rscaling.conj()).T * Rscaling vbMng(self, "DEL", "Done building gramian.", 10) if self.functionalSolve[:11] == "BARYCENTRIC": G = self._gram else: vbMng(self, "INIT", "Building generalized gramian.", 10) G = np.zeros((TN.shape[1],) * 2, dtype = np.complex) for k in range(len(invD)): iDkN = dot(invD[k], TN) G += dot(dot(self._gram, iDkN).T, iDkN.conj()).T vbMng(self, "DEL", "Done building gramian.", 10) - ev, eV = np.linalg.eigh(G) + evG, eVG = np.linalg.eigh(G) evExp, probKind = -1., "eigen" if (self.functionalSolve in ["NORM", "BARYCENTRIC_NORM"] - or np.sum(np.abs(ev) < np.finfo(float).eps * np.abs(ev[-1]) - * len(ev)) == 1): - eV = eV[:, 0] + or np.sum(np.abs(evG) < np.finfo(float).eps * np.abs(evG[-1]) + * len(evG)) == 1): + eV = eVG[:, 0] elif self.functionalSolve == "BARYCENTRIC_AVERAGE": - eV = eV.dot(ev ** evExp * np.sum(eV, axis = 0).conj()) + eV = eVG.dot(evG ** evExp * np.sum(eVG, axis = 0).conj()) else: - eV = eV.dot(ev ** evExp * eV[0].conj()) - ev = ev[1 :] + eV = eVG.dot(evG ** evExp * eVG[0].conj()) vbMng(self, "MAIN", ("Solved {}problem of size {} with condition number " - "{:.4e}.").format(probKind, len(ev), ev[-1] / ev[0]), 5) + "{:.4e}.").format(probKind, len(evG) - 1, evG[-1] / evG[1]), 5) if self.functionalSolve[:11] == "BARYCENTRIC": - N = len(eV) - arrow = np.zeros((N + 1,) * 2, dtype = np.complex) + S, mus = len(eV), self._musUniqueCN[self._reorder].flatten() + arrow = np.zeros((S + 1,) * 2, dtype = np.complex) arrow[1 :, 0] = 1. arrow[0, 1 :] = eV - arrow[np.arange(1, N + 1), - np.arange(1, N + 1)] = self._musUniqueCN[self._reorder[: N] - ].flatten() - active = np.eye(N + 1) + arrow[np.arange(1, S + 1), np.arange(1, S + 1)] = mus + active = np.eye(S + 1) active[0, 0] = 0. - Aev, AeV = eig(arrow, active) - AevGood = np.isinf(Aev) + np.isnan(Aev) == False - eV, AeV = Aev[AevGood], AeV[:, AevGood] - ev = np.sum(np.abs(arrow.dot(AeV) - eV * active.dot(AeV)) ** 2., - axis = 0) ** .5 - return ev, eV + poles, qTm1 = eig(arrow, active) + eVgood = np.isinf(poles) + np.isnan(poles) == False + poles = poles[eVgood] + self.N = len(poles) + if self.QTol > 0: + # compare optimal score with self.N poles to those obtained + # by removing one of the poles + qTm1 = qTm1[1 :, eVgood].conj() ** -1. + dists = mus.reshape(-1, 1) - mus + dists[np.arange(S), np.arange(S)] = multifactorial([self.N]) + dists = np.prod(dists, axis = 1).conj() ** -1. + qComp = np.empty((self.N + 1, S), dtype = np.complex) + qComp[0] = dists * np.prod(qTm1, axis = 1) + for j in range(self.N): + qTmj = np.prod(qTm1[:, np.arange(self.N) != j], axis = 1) + qComp[j + 1] = dists * qTmj + Lqs = qComp.dot(eVG) + scores = np.real(np.sum(Lqs * evG ** -evExp * Lqs.conj(), + axis = 1)) + evBad = scores[1 :] < self.QTol * scores[0] + nevBad = np.sum(evBad) + if nevBad: + vbMng(self, "MAIN", + ("Suboptimal pole{} detected. Reducing N by " + "{}.").format("s" * (nevBad > 1), nevBad), 10) + self.N = self.N - nevBad + poles = poles[evBad == False] + eV = poles + return evG[1 :], eV def getResidues(self, *args, **kwargs) -> Tuple[paramList, Np2D]: """ Obtain approximant residues. Returns: Matrix with residues as columns. """ return self.trainedModel.getResidues(*args, **kwargs) diff --git a/rrompy/solver/fenics/fenics_norms.py b/rrompy/solver/fenics/fenics_norms.py index 65c2bbe..ae73005 100644 --- a/rrompy/solver/fenics/fenics_norms.py +++ b/rrompy/solver/fenics/fenics_norms.py @@ -1,116 +1,116 @@ # Copyright (C) 2018-2020 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from scipy.sparse import block_diag import fenics as fen from rrompy.utilities.base.types import Np2D, FenFunc, DictAny, FenFuncSpace from rrompy.solver.norm_utilities import (Np2DLikeInv, Np2DLikeInvLowRank) from .fenics_la import fenics2Sparse __all__ = ['L2NormMatrix', 'L2InverseNormMatrix', 'H1NormMatrix', 'Hminus1NormMatrix', 'augmentedH1NormMatrix', 'augmentedHminus1NormMatrix', 'elasticNormMatrix', 'elasticDualNormMatrix', 'augmentedElasticNormMatrix', 'augmentedElasticDualNormMatrix'] def L2NormMatrix(V:FenFuncSpace, r_ : FenFunc = 1.) -> Np2D: u = fen.TrialFunction(V) v = fen.TestFunction(V) - return fenics2Sparse(r_ * fen.dot(u, v) * fen.dx) + return fenics2Sparse(r_ * u * v * fen.dx) def L2InverseNormMatrix(V:FenFuncSpace, r_ : FenFunc = 1., solverType : str = "SPSOLVE", solverArgs : DictAny = {}, compressRank : int = None, compressOversampling : int = 10, compressSeed : int = 420) -> Np2D: L2Mat = L2NormMatrix(V, r_) if compressRank is None: return Np2DLikeInv(L2Mat, 1., solverType, solverArgs) return Np2DLikeInvLowRank(L2Mat, 1., solverType, solverArgs, compressRank, compressOversampling, compressSeed) def H1NormMatrix(V:FenFuncSpace, w : float = 0., r_ : FenFunc = 1., a_ : FenFunc = 1.) -> Np2D: u = fen.TrialFunction(V) v = fen.TestFunction(V) - return fenics2Sparse((w * r_ * fen.dot(u, v) - + fen.dot(a_ * fen.grad(u), fen.grad(v))) * fen.dx) + return fenics2Sparse((w * r_ * u * v + + fen.inner(a_ * fen.grad(u), fen.grad(v))) * fen.dx) def Hminus1NormMatrix(V:FenFuncSpace, w : float = 0., r_ : FenFunc = 1., a_ : FenFunc = 1., solverType : str = "SPSOLVE", solverArgs : DictAny = {}, compressRank : int = None, compressOversampling : int = 10, compressSeed : int = 420) -> Np2D: H1Mat = H1NormMatrix(V, w, r_, a_) if compressRank is None: return Np2DLikeInv(H1Mat, 1., solverType, solverArgs) return Np2DLikeInvLowRank(H1Mat, 1., solverType, solverArgs, compressRank, compressOversampling, compressSeed) def augmentedH1NormMatrix(V:FenFuncSpace, r_ : FenFunc = 1., a_ : FenFunc = 1.) -> Np2D: return block_diag((H1NormMatrix(V, a_ = a_), L2NormMatrix(V, r_)), format = "csr") def augmentedHminus1NormMatrix(V:FenFuncSpace, r_ : FenFunc = 1., a_ : FenFunc = 1., solverType : str = "SPSOLVE", solverArgs : DictAny = {}, compressRank : int = None, compressOversampling : int = 10, compressSeed : int = 420) -> Np2D: H1Mat = augmentedH1NormMatrix(V, r_, a_) if compressRank is None: return Np2DLikeInv(H1Mat, 1., solverType, solverArgs) return Np2DLikeInvLowRank(H1Mat, 1., solverType, solverArgs, compressRank, compressOversampling, compressSeed) def elasticNormMatrix(V:FenFuncSpace, l_:FenFunc, m_:FenFunc, w : float = 0., r_ : FenFunc = 1.) -> Np2D: u = fen.TrialFunction(V) v = fen.TestFunction(V) epsilon = lambda f: 0.5 * (fen.grad(f) + fen.nabla_grad(f)) sigma = (l_ * fen.div(u) * fen.Identity(u.geometric_dimension()) + 2. * m_ * epsilon(u)) - return fenics2Sparse((w * r_ * fen.dot(u, v) + return fenics2Sparse((w * r_ * fen.inner(u, v) + fen.inner(sigma, epsilon(v))) * fen.dx) def elasticDualNormMatrix(V:FenFuncSpace, l_:FenFunc, m_:FenFunc, w : float = 0., r_ : FenFunc = 1., solverType : str = "SPSOLVE", solverArgs : DictAny = {}, compressRank : int = None, compressOversampling : int = 10, compressSeed : int = 420) -> Np2D: elMat = elasticNormMatrix(V, l_, m_, w, r_) if compressRank is None: return Np2DLikeInv(elMat, 1., solverType, solverArgs) return Np2DLikeInvLowRank(elMat, 1., solverType, solverArgs, compressRank, compressOversampling, compressSeed) def augmentedElasticNormMatrix(V:FenFuncSpace, l_:FenFunc, m_:FenFunc, r_ : FenFunc = 1.) -> Np2D: return block_diag((elasticNormMatrix(V, l_, m_), L2NormMatrix(V, r_)), format = "csr") def augmentedElasticDualNormMatrix(V:FenFuncSpace, l_:FenFunc, m_:FenFunc, r_ : FenFunc = 1., solverType : str = "SPSOLVE", solverArgs : DictAny = {}, compressRank : int = None, compressOversampling : int = 10, compressSeed : int = 420) -> Np2D: elMat = augmentedElasticNormMatrix(V, l_, m_, r_) if compressRank is None: return Np2DLikeInv(elMat, 1., solverType, solverArgs) return Np2DLikeInvLowRank(elMat, 1., solverType, solverArgs, compressRank, compressOversampling, compressSeed) diff --git a/rrompy/utilities/expression/monomial_creator.py b/rrompy/utilities/expression/monomial_creator.py index 08bb1c8..665da93 100644 --- a/rrompy/utilities/expression/monomial_creator.py +++ b/rrompy/utilities/expression/monomial_creator.py @@ -1,64 +1,65 @@ # Copyright (C) 2018-2020 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import numpy as np from collections.abc import Iterable from rrompy.utilities.numerical.factorials import multibinom from rrompy.utilities.numerical.hash_derivative import (nextDerivativeIndices, hashIdxToDerivative as hashI, hashDerivativeToIdx as hashD) from rrompy.utilities.base.types import List, TupleAny __all__ = ["createMonomial", "createMonomialList"] def createMonomial(deg:List[int], return_derivatives : bool = False) -> List[List[TupleAny]]: if not isinstance(deg, Iterable): deg = [deg] dim = len(deg) degj = hashD(deg) expr = [] for k in range(degj * return_derivatives + 1): degder = hashI(k, dim) derdiff = [x - y for (x, y) in zip(deg, degder)] if all([d >= 0 for d in derdiff]): mult = multibinom(deg, degder) activex = np.where(derdiff)[0] if len(activex) == 0: exprLoc = (mult,) else: if len(activex) == 1: - activex = activex[0] - exprLoc = ("x", "()", activex, "**", derdiff[activex]) + exprLoc = ("x", "()", activex[0]) + expLoc = derdiff[activex[0]] + if expLoc > 1: exprLoc += ("**", expLoc) else: exprLoc = ("prod", {"axis" : 1}, ("x", "**", derdiff)) if not np.isclose(mult, 1, atol = 1e-10): - exprLoc = exprLoc + ("*", mult,) + exprLoc = exprLoc + ("*", mult) expr += [exprLoc] else: expr += [(0.,)] if return_derivatives: expr += [None] return expr def createMonomialList(n:int, dim:int, return_derivatives : bool = False) -> List[List[TupleAny]]: derIdxs = nextDerivativeIndices([], dim, n) idxList = [] for j, der in enumerate(derIdxs): idxList += [createMonomial(der, return_derivatives)] return idxList diff --git a/rrompy/utilities/numerical/compress_matrix.py b/rrompy/utilities/numerical/compress_matrix.py index 49684cb..0729d3f 100644 --- a/rrompy/utilities/numerical/compress_matrix.py +++ b/rrompy/utilities/numerical/compress_matrix.py @@ -1,39 +1,39 @@ # Copyright (C) 2018-2020 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import numpy as np from rrompy.utilities.numerical.tensor_la import dot from rrompy.utilities.base.types import Np2D, Tuple, HFEng __all__ = ["compressMatrix"] def compressMatrix(A:Np2D, tol : float = 0., HFEngine : HFEng = None, is_state : bool = True) -> Tuple[Np2D, Np2D, float]: """Compress matrix by SVD.""" - if HFEngine is None or not is_state: + if HFEngine is None: U, s, _ = np.linalg.svd(A.T.conj().dot(A)) else: U, s, _ = np.linalg.svd(HFEngine.innerProduct(A, A, is_state = is_state)) remove = np.where(s < tol * s[0])[0] ncut = len(s) if len(remove) == 0 else remove[0] sums = np.sum(s) s = s[: ncut] ** .5 R = (U[:, : ncut].conj() * s).T U = dot(A, U[:, : ncut] * s ** -1.) return U, R, 1. - np.linalg.norm(s) / sums diff --git a/tests/3_reduction_methods_1D/rational_interpolant_greedy_1d.py b/tests/3_reduction_methods_1D/rational_interpolant_greedy_1d.py index 97a417b..f01873b 100644 --- a/tests/3_reduction_methods_1D/rational_interpolant_greedy_1d.py +++ b/tests/3_reduction_methods_1D/rational_interpolant_greedy_1d.py @@ -1,94 +1,94 @@ # Copyright (C) 2018-2020 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import numpy as np from matrix_fft import matrixFFT from rrompy.reduction_methods import RationalInterpolantGreedy as RIG from rrompy.parameter.parameter_sampling import QuadratureSampler as QS def test_lax_tolerance(capsys): mu = 2.25 solver = matrixFFT() params = {"POD": True, "sampler": QS([1.5, 6.5], "UNIFORM"), "S": 4, "polybasis": "CHEBYSHEV", "greedyTol": 1e-2, "errorEstimatorKind": "LOOK_AHEAD", "samplerTrainSet": QS([1.5, 6.5], "CHEBYSHEV")} approx = RIG(solver, 4, approxParameters = params, verbosity = 100) approx.setupApprox() out, err = capsys.readouterr() assert "Done computing snapshots (final snapshot count: 11)." in out assert len(err) == 0 - assert np.isclose(approx.normErr(mu)[0], 2.169678e-4, rtol = 1e-1) + assert np.isclose(approx.normErr(mu)[0], 4.67e-05, rtol = 1e-1) def test_samples_at_poles(): solver = matrixFFT() params = {"POD": True, "sampler": QS([1.5, 6.5], "UNIFORM"), "S": 4, "nTestPoints": 100, "polybasis": "CHEBYSHEV", "greedyTol": 1e-5, "errorEstimatorKind": "AFFINE", "samplerTrainSet": QS([1.5, 6.5], "CHEBYSHEV")} approx = RIG(solver, 4., approxParameters = params, verbosity = 0) approx.setupApprox() for mu in approx.mus: assert np.isclose(approx.normErr(mu)[0] / (1e-15+approx.normHF(mu)[0]), 0., atol = 1e-4) poles = approx.getPoles() for lambda_ in range(2, 7): assert np.isclose(np.min(np.abs(poles - lambda_)), 0., atol = 1e-3) assert np.isclose(np.min(np.abs(np.array(approx.mus(0)) - lambda_)), 0., atol = 1e-1) def test_maxIter(): solver = matrixFFT() params = {"POD": True, "sampler": QS([1.5, 6.5], "UNIFORM"), "S": 5, "nTestPoints": 500, "polybasis": "CHEBYSHEV", "greedyTol": 1e-6, "maxIter": 10, "errorEstimatorKind": "LOOK_AHEAD_RES", "samplerTrainSet": QS([1.5, 6.5], "CHEBYSHEV")} approx = RIG(solver, 4., approxParameters = params, verbosity = 0) approx.input = lambda: "N" approx.setupApprox() assert len(approx.mus) == 10 _, _, maxEst = approx.errorEstimator(approx.muTest, True) assert maxEst > 1e-6 def test_load_copy(capsys): mu = 3. solver = matrixFFT() params = {"POD": True, "sampler": QS([1.5, 6.5], "UNIFORM"), "S": 4, "nTestPoints": 100, "polybasis": "CHEBYSHEV", "greedyTol": 1e-5, "errorEstimatorKind": "AFFINE", "samplerTrainSet": QS([1.5, 6.5], "CHEBYSHEV")} approx1 = RIG(solver, 4., approxParameters = params, verbosity = 100) approx1.setupApprox() err1 = approx1.normErr(mu)[0] out, err = capsys.readouterr() assert "Solving HF model for mu =" in out assert len(err) == 0 approx2 = RIG(solver, 4., approxParameters = params, verbosity = 100) approx2.setTrainedModel(approx1) approx2.setHF(mu, approx1.uHF) err2 = approx2.normErr(mu)[0] out, err = capsys.readouterr() assert "Solving HF model for mu =" not in out assert len(err) == 0 assert np.isclose(err1, err2, rtol = 1e-10)