diff --git a/rrompy/hfengines/base/__init__.py b/rrompy/hfengines/base/__init__.py
index 6307374..8fb603b 100644
--- a/rrompy/hfengines/base/__init__.py
+++ b/rrompy/hfengines/base/__init__.py
@@ -1,38 +1,41 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
from .boundary_conditions import BoundaryConditions
from .fenics_engine_base import FenicsEngineBase
from .hfengine_base import HFEngineBase
-from .linear_affine_engine import LinearAffineEngine
+from .linear_affine_engine import LinearAffineEngine, checkIfAffine
+from .linear_nonaffine_engine import LinearNonAffineEngine
from .marginal_proxy_engine import MarginalProxyEngine
from .numpy_engine_base import NumpyEngineBase
from .vector_fenics_engine_base import VectorFenicsEngineBase
__all__ = [
'BoundaryConditions',
'FenicsEngineBase',
'HFEngineBase',
'LinearAffineEngine',
+ 'checkIfAffine',
+ 'LinearNonAffineEngine',
'MarginalProxyEngine',
'NumpyEngineBase',
'VectorFenicsEngineBase'
]
diff --git a/rrompy/hfengines/base/linear_affine_engine.py b/rrompy/hfengines/base/linear_affine_engine.py
index f2b3d0b..acec106 100644
--- a/rrompy/hfengines/base/linear_affine_engine.py
+++ b/rrompy/hfengines/base/linear_affine_engine.py
@@ -1,187 +1,195 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
from abc import abstractmethod
import numpy as np
import scipy.sparse as scsp
from copy import deepcopy as copy
from rrompy.utilities.base.types import (Np1D, Np2D, List, ListAny, TupleAny,
paramVal)
from rrompy.utilities.expression import (expressionEvaluator, createMonomial,
createMonomialList)
from rrompy.utilities.numerical import hashDerivativeToIdx as hashD
from rrompy.utilities.exception_manager import RROMPyException
-__all__ = ['LinearAffineEngine']
+__all__ = ['LinearAffineEngine', 'checkIfAffine']
class LinearAffineEngine:
"""Generic solver for affine parametric problems."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._affinePoly = True
self.nAs, self.nbs = 1, 1
@property
def affinePoly(self):
return self._affinePoly
@property
def nAs(self):
"""Value of nAs."""
return self._nAs
@nAs.setter
def nAs(self, nAs):
nAsOld = self._nAs if hasattr(self, "_nAs") else -1
if nAs != nAsOld:
self._nAs = nAs
self.resetAs()
@property
def nbs(self):
"""Value of nbs."""
return self._nbs
@nbs.setter
def nbs(self, nbs):
nbsOld = self._nbs if hasattr(self, "_nbs") else -1
if nbs != nbsOld:
self._nbs = nbs
self.resetbs()
@property
def spacedim(self):
if (hasattr(self, "bs") and hasattr(self.bs, "__len__")
and self.bs[0] is not None):
return len(self.bs[0])
return super().spacedim
def getMonomialSingleWeight(self, deg:List[int]):
return createMonomial(deg, True)
def getMonomialWeights(self, n:int):
return createMonomialList(n, self.npar, True)
def setAs(self, As:List[Np2D]):
"""Assign terms of operator of linear system."""
if len(As) != self.nAs:
raise RROMPyException(("Expected number {} of terms of As not "
"matching given list length {}.").format(self.nAs,
len(As)))
self.As = [copy(A) for A in As]
def setthAs(self, thAs:List[List[TupleAny]]):
"""Assign terms of operator of linear system."""
if len(thAs) != self.nAs:
raise RROMPyException(("Expected number {} of terms of thAs not "
"matching given list length {}.").format(self.nAs,
len(thAs)))
self.thAs = copy(thAs)
def setbs(self, bs:List[Np1D]):
"""Assign terms of RHS of linear system."""
if len(bs) != self.nbs:
raise RROMPyException(("Expected number {} of terms of bs not "
"matching given list length {}.").format(self.nbs,
len(bs)))
self.bs = [copy(b) for b in bs]
def setthbs(self, thbs:List[List[TupleAny]]):
"""Assign terms of RHS of linear system."""
if len(thbs) != self.nbs:
raise RROMPyException(("Expected number {} of terms of thbs not "
"matching given list length {}.").format(self.nbs,
len(thbs)))
self.thbs = copy(thbs)
def resetAs(self):
"""Reset (derivatives of) operator of linear system."""
if hasattr(self, "_nAs"):
self.setAs([None] * self.nAs)
self.setthAs([None] * self.nAs)
def resetbs(self):
"""Reset (derivatives of) RHS of linear system."""
if hasattr(self, "_nbs"):
self.setbs([None] * self.nbs)
self.setthbs([None] * self.nbs)
def _assembleObject(self, mu:paramVal, objs:ListAny, th:ListAny,
derI:int) -> Np2D:
"""Assemble (derivative of) object from list of derivatives."""
mu = self.checkParameter(mu)
rExp = self.rescalingExp
muE = mu ** rExp
obj = None
for j in range(len(objs)):
if len(th[j]) <= derI and th[j][-1] is not None:
raise RROMPyException(("Cannot assemble operator. Non enough "
"derivatives of theta provided."))
if len(th[j]) > derI and th[j][derI] is not None:
expr = expressionEvaluator(th[j][derI], muE)
if hasattr(expr, "__len__"):
if len(expr) > 1:
raise RROMPyException(("Size mismatch in value of "
"theta function. Only scalars "
"allowed."))
expr = expr[0]
if obj is None:
obj = expr * objs[j]
else:
obj = obj + expr * objs[j]
return obj
@abstractmethod
def buildA(self):
"""Build terms of operator of linear system."""
if self.thAs[0] is None: self.thAs = self.getMonomialWeights(self.nAs)
if self.As[0] is None:
self.As[0] = scsp.eye(self.spacedim, dtype = np.complex,
format = "csr")
for j in range(1, self.nAs):
if self.As[j] is None: self.As[j] = self.baselineA()
def A(self, mu : paramVal = [], der : List[int] = 0) -> Np2D:
"""
Assemble terms of operator of linear system and return it (or its
derivative) at a given parameter.
"""
derI = hashD(der) if hasattr(der, "__len__") else der
if derI < 0 or derI > self.nAs - 1: return self.baselineA()
self.buildA()
assembledA = self._assembleObject(mu, self.As, self.thAs, derI)
if assembledA is None: return self.baselineA()
return assembledA
@abstractmethod
def buildb(self):
"""Build terms of RHS of linear system."""
if self.thbs[0] is None: self.thbs = self.getMonomialWeights(self.nbs)
for j in range(self.nbs):
if self.bs[j] is None: self.bs[j] = self.baselineb()
def b(self, mu : paramVal = [], der : List[int] = 0) -> Np1D:
"""
Assemble terms of RHS of linear system and return it (or its
derivative) at a given parameter.
"""
derI = hashD(der) if hasattr(der, "__len__") else der
if derI < 0 or derI > self.nbs - 1: return self.baselineb()
self.buildb()
assembledb = self._assembleObject(mu, self.bs, self.thbs, derI)
if assembledb is None: return self.baselineb()
return assembledb
+
+def checkIfAffine(engine, msg : str = "apply method", justb : bool = False):
+ msg = ("Cannot {} because of non-affine parametric dependence{}. Consider "
+ "using DEIM to define a new engine.").format(msg, " of RHS" * justb)
+ isAffine = (issubclass(engine.__class__, LinearAffineEngine)
+ or ((justb and hasattr(engine, "buildA"))
+ and hasattr(engine, "buildb")))
+ if not isAffine: raise RROMPyException(msg)
diff --git a/rrompy/hfengines/base/linear_nonaffine_engine.py b/rrompy/hfengines/base/linear_nonaffine_engine.py
new file mode 100644
index 0000000..9d4af74
--- /dev/null
+++ b/rrompy/hfengines/base/linear_nonaffine_engine.py
@@ -0,0 +1,48 @@
+# Copyright (C) 2018 by the RROMPy authors
+#
+# This file is part of RROMPy.
+#
+# RROMPy is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# RROMPy is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with RROMPy. If not, see .
+#
+
+from abc import abstractmethod
+from rrompy.utilities.base.types import Np1D, Np2D, List, paramVal
+from rrompy.utilities.numerical import hashDerivativeToIdx as hashD
+
+__all__ = ['LinearNonAffineEngine']
+
+class LinearNonAffineEngine:
+ """Generic solver for non-affine parametric problems."""
+
+ @abstractmethod
+ def A(self, mu : paramVal = [], der : List[int] = 0) -> Np2D:
+ """
+ Assemble terms of operator of linear system and return it (or its
+ derivative) at a given parameter.
+ """
+ derI = hashD(der) if hasattr(der, "__len__") else der
+ A = self.baselineA()
+ if derI < 0: return A
+ return A
+
+ @abstractmethod
+ def b(self, mu : paramVal = [], der : List[int] = 0) -> Np1D:
+ """
+ Assemble terms of RHS of linear system and return it (or its
+ derivative) at a given parameter.
+ """
+ derI = hashD(der) if hasattr(der, "__len__") else der
+ b = self.baselineb()
+ if derI < 0: return b
+ return b
diff --git a/rrompy/reduction_methods/greedy/generic_greedy_approximant.py b/rrompy/reduction_methods/greedy/generic_greedy_approximant.py
index 77b35c2..6ead3a8 100644
--- a/rrompy/reduction_methods/greedy/generic_greedy_approximant.py
+++ b/rrompy/reduction_methods/greedy/generic_greedy_approximant.py
@@ -1,689 +1,702 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
from copy import deepcopy as copy
import numpy as np
+from rrompy.hfengines.base.linear_affine_engine import checkIfAffine
from rrompy.reduction_methods.standard.generic_standard_approximant \
import GenericStandardApproximant
from rrompy.utilities.base.types import (Np1D, Np2D, DictAny, HFEng, Tuple,
List, normEng, paramVal, paramList,
sampList)
from rrompy.utilities.base import verbosityManager as vbMng
from rrompy.utilities.numerical import dot
from rrompy.utilities.expression import expressionEvaluator
from rrompy.solver import normEngine
from rrompy.utilities.exception_manager import (RROMPyException, RROMPyAssert,
RROMPyWarning)
from rrompy.parameter import checkParameterList, emptyParameterList
__all__ = ['GenericGreedyApproximant']
def localL2Distance(mus:Np2D, badmus:Np2D) -> Np2D:
return np.linalg.norm(np.tile(mus[..., np.newaxis], [1, 1, len(badmus)])
- badmus[..., np.newaxis].T, axis = 1)
def pruneSamples(mus:paramList, badmus:paramList,
tol : float = 1e-8) -> Np1D:
"""Remove from mus all the elements which are too close to badmus."""
if len(badmus) == 0: return mus
proximity = np.min(localL2Distance(mus.data, badmus.data), axis = 1)
return np.arange(len(mus))[proximity <= tol]
class GenericGreedyApproximant(GenericStandardApproximant):
"""
ROM greedy interpolant computation for parametric problems
(ABSTRACT).
Args:
HFEngine: HF problem solver.
mu0(optional): Default parameter. Defaults to 0.
approxParameters(optional): Dictionary containing values for main
parameters of approximant. Recognized keys are:
- 'POD': whether to compute POD of snapshots; defaults to True;
- 'S': number of starting training points;
- 'sampler': sample point generator;
- 'greedyTol': uniform error tolerance for greedy algorithm;
defaults to 1e-2;
- 'collinearityTol': collinearity tolerance for greedy algorithm;
defaults to 0.;
- 'interactive': whether to interactively terminate greedy
algorithm; defaults to False;
- 'maxIter': maximum number of greedy steps; defaults to 1e2;
- 'refinementRatio': ratio of test points to be exhausted before
test set refinement; defaults to 0.2;
- 'nTestPoints': number of test points; defaults to 5e2;
- 'trainSetGenerator': training sample points generator; defaults
to sampler.
Defaults to empty dict.
approx_state(optional): Whether to approximate state. Defaults to
False.
verbosity(optional): Verbosity level. Defaults to 10.
Attributes:
HFEngine: HF problem solver.
mu0: Default parameter.
mus: Array of snapshot parameters.
approxParameters: Dictionary containing values for main parameters of
approximant. Recognized keys are in parameterList.
parameterListSoft: Recognized keys of soft approximant parameters:
- 'POD': whether to compute POD of snapshots.
- 'greedyTol': uniform error tolerance for greedy algorithm;
- 'collinearityTol': collinearity tolerance for greedy algorithm;
- 'interactive': whether to interactively terminate greedy
algorithm;
- 'maxIter': maximum number of greedy steps;
- 'refinementRatio': ratio of test points to be exhausted before
test set refinement;
- 'nTestPoints': number of test points;
- 'trainSetGenerator': training sample points generator.
parameterListCritical: Recognized keys of critical approximant
parameters:
- 'S': total number of samples current approximant relies upon;
- 'sampler': sample point generator.
approx_state: Whether to approximate state.
verbosity: Verbosity level.
POD: whether to compute POD of snapshots.
S: number of test points.
sampler: Sample point generator.
greedyTol: Uniform error tolerance for greedy algorithm.
collinearityTol: Collinearity tolerance for greedy algorithm.
interactive: whether to interactively terminate greedy algorithm.
maxIter: maximum number of greedy steps.
refinementRatio: ratio of training points to be exhausted before
training set refinement.
nTestPoints: number of starting training points.
trainSetGenerator: training sample points generator.
muBounds: list of bounds for parameter values.
samplingEngine: Sampling engine.
estimatorNormEngine: Engine for estimator norm computation.
uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as
sampleList.
lastSolvedHF: Parameter(s) corresponding to last computed high fidelity
solution(s) as parameterList.
uApproxReduced: Reduced approximate solution(s) with parameter(s)
lastSolvedApprox as sampleList.
lastSolvedApproxReduced: Parameter(s) corresponding to last computed
reduced approximate solution(s) as parameterList.
uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as
sampleList.
lastSolvedApprox: Parameter(s) corresponding to last computed
approximate solution(s) as parameterList.
"""
TOL_INSTABILITY = 1e-6
def __init__(self, HFEngine:HFEng, mu0 : paramVal = None,
approxParameters : DictAny = {}, approx_state : bool = False,
verbosity : int = 10, timestamp : bool = True):
self._preInit()
self._addParametersToList(["greedyTol", "collinearityTol",
"interactive", "maxIter", "refinementRatio",
"nTestPoints"],
[1e-2, 0., False, 1e2, .2, 5e2],
["trainSetGenerator"], ["AUTO"])
super().__init__(HFEngine = HFEngine, mu0 = mu0,
approxParameters = approxParameters,
approx_state = approx_state, verbosity = verbosity,
timestamp = timestamp)
self._postInit()
@property
def greedyTol(self):
"""Value of greedyTol."""
return self._greedyTol
@greedyTol.setter
def greedyTol(self, greedyTol):
if greedyTol < 0:
raise RROMPyException("greedyTol must be non-negative.")
if hasattr(self, "_greedyTol") and self.greedyTol is not None:
greedyTolold = self.greedyTol
else:
greedyTolold = -1
self._greedyTol = greedyTol
self._approxParameters["greedyTol"] = self.greedyTol
if greedyTolold != self.greedyTol:
self.resetSamples()
@property
def collinearityTol(self):
"""Value of collinearityTol."""
return self._collinearityTol
@collinearityTol.setter
def collinearityTol(self, collinearityTol):
if collinearityTol < 0:
raise RROMPyException("collinearityTol must be non-negative.")
if (hasattr(self, "_collinearityTol")
and self.collinearityTol is not None):
collinearityTolold = self.collinearityTol
else:
collinearityTolold = -1
self._collinearityTol = collinearityTol
self._approxParameters["collinearityTol"] = self.collinearityTol
if collinearityTolold != self.collinearityTol:
self.resetSamples()
@property
def interactive(self):
"""Value of interactive."""
return self._interactive
@interactive.setter
def interactive(self, interactive):
self._interactive = interactive
@property
def maxIter(self):
"""Value of maxIter."""
return self._maxIter
@maxIter.setter
def maxIter(self, maxIter):
if maxIter <= 0: raise RROMPyException("maxIter must be positive.")
if hasattr(self, "_maxIter") and self.maxIter is not None:
maxIterold = self.maxIter
else:
maxIterold = -1
self._maxIter = maxIter
self._approxParameters["maxIter"] = self.maxIter
if maxIterold != self.maxIter:
self.resetSamples()
@property
def refinementRatio(self):
"""Value of refinementRatio."""
return self._refinementRatio
@refinementRatio.setter
def refinementRatio(self, refinementRatio):
if refinementRatio <= 0. or refinementRatio > 1.:
raise RROMPyException(("refinementRatio must be between 0 "
"(excluded) and 1."))
if (hasattr(self, "_refinementRatio")
and self.refinementRatio is not None):
refinementRatioold = self.refinementRatio
else:
refinementRatioold = -1
self._refinementRatio = refinementRatio
self._approxParameters["refinementRatio"] = self.refinementRatio
if refinementRatioold != self.refinementRatio:
self.resetSamples()
@property
def nTestPoints(self):
"""Value of nTestPoints."""
return self._nTestPoints
@nTestPoints.setter
def nTestPoints(self, nTestPoints):
if nTestPoints <= 0:
raise RROMPyException("nTestPoints must be positive.")
if not np.isclose(nTestPoints, np.int(nTestPoints)):
raise RROMPyException("nTestPoints must be an integer.")
nTestPoints = np.int(nTestPoints)
if hasattr(self, "_nTestPoints") and self.nTestPoints is not None:
nTestPointsold = self.nTestPoints
else:
nTestPointsold = -1
self._nTestPoints = nTestPoints
self._approxParameters["nTestPoints"] = self.nTestPoints
if nTestPointsold != self.nTestPoints:
self.resetSamples()
@property
def trainSetGenerator(self):
"""Value of trainSetGenerator."""
return self._trainSetGenerator
@trainSetGenerator.setter
def trainSetGenerator(self, trainSetGenerator):
if (isinstance(trainSetGenerator, (str,))
and trainSetGenerator.upper() == "AUTO"):
trainSetGenerator = self.sampler
if 'generatePoints' not in dir(trainSetGenerator):
raise RROMPyException("trainSetGenerator type not recognized.")
if (hasattr(self, '_trainSetGenerator')
and self.trainSetGenerator not in [None, "AUTO"]):
trainSetGeneratorOld = self.trainSetGenerator
self._trainSetGenerator = trainSetGenerator
self._approxParameters["trainSetGenerator"] = self.trainSetGenerator
if (not 'trainSetGeneratorOld' in locals()
or trainSetGeneratorOld != self.trainSetGenerator):
self.resetSamples()
def resetSamples(self):
"""Reset samples."""
super().resetSamples()
self._mus = emptyParameterList()
def initEstimatorNormEngine(self, normEngn : normEng = None):
"""Initialize estimator norm engine."""
if (normEngn is not None or not hasattr(self, "estimatorNormEngine")
or self.estimatorNormEngine is None):
if normEngn is None:
if not hasattr(self.HFEngine, "energyNormPartialDualMatrix"):
self.HFEngine.buildEnergyNormPartialDualForm()
estimatorEnergyMatrix = (
self.HFEngine.energyNormPartialDualMatrix)
else:
if hasattr(normEngn, "buildEnergyNormPartialDualForm"):
if not hasattr(normEngn, "energyNormPartialDualMatrix"):
normEngn.buildEnergyNormPartialDualForm()
estimatorEnergyMatrix = (
normEngn.energyNormPartialDualMatrix)
else:
estimatorEnergyMatrix = normEngn
self.estimatorNormEngine = normEngine(estimatorEnergyMatrix)
def _affineResidualMatricesContraction(self, rb:Np2D, rA : Np2D = None) \
-> Tuple[Np1D, Np1D, Np1D]:
- self.assembleReducedResidualBlocks(full = True)
+ self.assembleReducedResidualBlocks(full = rA is not None)
# 'ij,jk,ik->k', resbb, radiusb, radiusb.conj()
ff = np.sum(self.trainedModel.data.resbb.dot(rb) * rb.conj(), axis = 0)
if rA is None: return ff
# 'ijk,jkl,il->l', resAb, radiusA, radiusb.conj()
Lf = np.sum(np.tensordot(self.trainedModel.data.resAb, rA, 2)
* rb.conj(), axis = 0)
# 'ijkl,klt,ijt->t', resAA, radiusA, radiusA.conj()
LL = np.sum(np.tensordot(self.trainedModel.data.resAA, rA, 2)
* rA.conj(), axis = (0, 1))
return ff, Lf, LL
def errorEstimator(self, mus:Np1D) -> Np1D:
"""Standard residual-based error estimator."""
self.setupApprox()
- if hasattr(self.HFEngine, "buildA"): self.HFEngine.buildA()
- if hasattr(self.HFEngine, "buildb"): self.HFEngine.buildb()
+ checkIfAffine(self.HFEngine, "apply affinity-based error estimator")
+ self.HFEngine.buildA()
+ self.HFEngine.buildb()
mus = checkParameterList(mus, self.npar)[0]
vbMng(self.trainedModel, "INIT",
"Evaluating error estimator at mu = {}.".format(mus), 10)
verb = self.trainedModel.verbosity
self.trainedModel.verbosity = 0
uApproxRs = self.getApproxReduced(mus)
muTestEff = mus ** self.HFEngine.rescalingExp
radiusA = np.empty((len(self.HFEngine.thAs), len(mus)),
dtype = np.complex)
radiusb = np.empty((len(self.HFEngine.thbs), len(mus)),
dtype = np.complex)
for j, thA in enumerate(self.HFEngine.thAs):
radiusA[j] = expressionEvaluator(thA[0], muTestEff)
for j, thb in enumerate(self.HFEngine.thbs):
radiusb[j] = expressionEvaluator(thb[0], muTestEff)
radiusA = np.expand_dims(uApproxRs.data, 1) * radiusA
ff, Lf, LL = self._affineResidualMatricesContraction(radiusb, radiusA)
err = np.abs((LL - 2. * np.real(Lf) + ff) / ff) ** .5
self.trainedModel.verbosity = verb
vbMng(self.trainedModel, "DEL", "Done evaluating error estimator", 10)
return err
def getMaxErrorEstimator(self, mus:paramList) -> Tuple[Np1D, int, float]:
"""
Compute maximum of (and index of maximum of) error estimator over given
parameters.
"""
errorEstTest = self.errorEstimator(mus)
idxMaxEst = [np.argmax(errorEstTest)]
return errorEstTest, idxMaxEst, errorEstTest[idxMaxEst]
def _isLastSampleCollinear(self) -> bool:
"""Check collinearity of last sample."""
if self.collinearityTol <= 0.: return False
if self.POD:
reff = self.samplingEngine.RPOD[:, -1]
else:
RROMPyWarning(("Repeated orthogonalization of the samples for "
"collinearity check. Consider setting POD to "
"True."))
if not hasattr(self, "_PODEngine"):
from rrompy.sampling.base.pod_engine import PODEngine
self._PODEngine = PODEngine(self.HFEngine)
reff = self._PODEngine.generalizedQR(self.samplingEngine.samples,
only_R = True,
is_state = True)[:, -1]
cLevel = np.abs(reff[-1]) / np.linalg.norm(reff)
vbMng(self, "MAIN", "Collinearity indicator {:.4e}.".format(cLevel), 5)
return cLevel < self.collinearityTol
def greedyNextSample(self, muidx:int, plotEst : bool = False)\
-> Tuple[Np1D, int, float, paramVal]:
"""Compute next greedy snapshot of solution map."""
RROMPyAssert(self._mode, message = "Cannot add greedy sample.")
mus = copy(self.muTest[muidx])
self.muTest.pop(muidx)
for mu in mus:
vbMng(self, "MAIN",
("Adding sample point no. {} at {} to training "
"set.").format(self.samplingEngine.nsamples + 1, mu), 2)
self.mus.append(mu)
self._S = len(self.mus)
self._approxParameters["S"] = self.S
self.samplingEngine.nextSample(mu)
if self._isLastSampleCollinear():
RROMPyWarning("Collinearity above tolerance detected.")
errorEstTest = np.empty(len(self.muTest))
errorEstTest[:] = np.nan
return errorEstTest, [-1], np.nan, np.nan
errorEstTest, muidx, maxErrorEst = self.getMaxErrorEstimator(
self.muTest)
if (plotEst and not np.any(np.isnan(errorEstTest))
and not np.any(np.isinf(errorEstTest))):
musre = copy(self.muTest.re.data)
from matplotlib import pyplot as plt
plt.figure()
errCP = copy(errorEstTest)
while len(musre) > 0:
if self.npar == 1:
currIdx = np.arange(len(musre))
else:
currIdx = np.where(np.isclose(np.sum(
np.abs(musre[:, 1 :] - musre[0, 1 :]), 1), 0.))[0]
plt.semilogy(musre[currIdx, 0], errCP[currIdx], 'k',
linewidth = 1)
musre = np.delete(musre, currIdx, 0)
errCP = np.delete(errCP, currIdx)
plt.semilogy([self.muTest.re(0, 0), self.muTest.re(-1, 0)],
[self.greedyTol] * 2, 'r--')
plt.semilogy(self.mus.re(0),
2. * self.greedyTol * np.ones(len(self.mus)), '*m')
plt.semilogy(self.muTest.re(muidx, 0), maxErrorEst, 'xr')
plt.grid()
plt.show()
plt.close()
return errorEstTest, muidx, maxErrorEst, self.muTest[muidx]
def _preliminaryTraining(self):
"""Initialize starting snapshots of solution map."""
RROMPyAssert(self._mode, message = "Cannot start greedy algorithm.")
if self.samplingEngine.nsamples > 0:
return
self.computeScaleFactor()
self.resetSamples()
self.mus = self.trainSetGenerator.generatePoints(self.S)[
list(range(self.S))]
muTestBase = self.sampler.generatePoints(self.nTestPoints)
idxPop = pruneSamples(muTestBase ** self.HFEngine.rescalingExp,
self.mus ** self.HFEngine.rescalingExp,
1e-10 * self.scaleFactor[0])
muTestBase.pop(idxPop)
muTestBase = muTestBase.sort()
muLast = copy(self.mus[-1])
self.mus.pop()
if len(self.mus) > 0:
vbMng(self, "MAIN",
("Adding first {} sample point{} at {} to training "
"set.").format(self.S - 1, "" + "s" * (self.S > 2),
self.mus), 2)
self.samplingEngine.iterSample(self.mus)
self._S = len(self.mus)
self._approxParameters["S"] = self.S
self.muTest = emptyParameterList()
self.muTest.reset((len(muTestBase) + 1, self.mus.shape[1]))
self.muTest[: -1] = muTestBase.data
self.muTest[-1] = muLast.data
def _enrichTestSet(self, nTest:int):
"""Add extra elements to test set."""
RROMPyAssert(self._mode, message = "Cannot enrich test set.")
muTestExtra = self.sampler.generatePoints(2 * nTest)
muTotal = copy(self.mus)
muTotal.append(self.muTest)
idxPop = pruneSamples(muTestExtra ** self.HFEngine.rescalingExp,
muTotal ** self.HFEngine.rescalingExp,
1e-10 * self.scaleFactor[0])
muTestExtra.pop(idxPop)
muTestNew = np.empty((len(self.muTest) + len(muTestExtra),
self.muTest.shape[1]), dtype = np.complex)
muTestNew[: len(self.muTest)] = self.muTest.data
muTestNew[len(self.muTest) :] = muTestExtra.data
self.muTest = checkParameterList(muTestNew, self.npar)[0].sort()
vbMng(self, "MAIN",
"Enriching test set by {} elements.".format(len(muTestExtra)), 5)
def greedy(self, plotEst : bool = False):
"""Compute greedy snapshots of solution map."""
RROMPyAssert(self._mode, message = "Cannot start greedy algorithm.")
if self.samplingEngine.nsamples > 0:
return
vbMng(self, "INIT", "Starting computation of snapshots.", 2)
self._preliminaryTraining()
nTest = self.nTestPoints
muT0 = copy(self.muTest[-1])
errorEstTest, muidx, maxErrorEst, mu = self.greedyNextSample(
[len(self.muTest) - 1], plotEst)
if np.any(np.isnan(maxErrorEst)):
RROMPyWarning(("Instability in a posteriori estimator. "
"Starting preemptive greedy loop termination."))
self.muTest.append(muT0)
self.mus.pop(-1)
self.samplingEngine.popSample()
self.setupApprox()
else:
- vbMng(self, "MAIN", ("Uniform testing error estimate "
- "{:.4e}.").format(np.max(maxErrorEst)), 2)
+ if maxErrorEst is not None:
+ max2ErrorEst = np.max(maxErrorEst)
+ vbMng(self, "MAIN", ("Uniform testing error estimate "
+ "{:.4e}.").format(max2ErrorEst), 2)
trainedModelOld = copy(self.trainedModel)
while (self.samplingEngine.nsamples < self.maxIter
- and np.max(maxErrorEst) > self.greedyTol):
+ and (maxErrorEst is None or max2ErrorEst > self.greedyTol)):
if (1. - self.refinementRatio) * nTest > len(self.muTest):
self._enrichTestSet(nTest)
nTest = len(self.muTest)
- muTestOld, maxErrorEstOld = self.muTest, np.max(maxErrorEst)
+ muTestOld = self.muTest
+ if maxErrorEst is not None:
+ max2ErrorEstOld = max2ErrorEst
errorEstTest, muidx, maxErrorEst, mu = self.greedyNextSample(
muidx, plotEst)
- vbMng(self, "MAIN", ("Uniform testing error estimate "
- "{:.4e}.").format(np.max(maxErrorEst)), 2)
+ if maxErrorEst is not None:
+ max2ErrorEst = np.max(maxErrorEst)
+ vbMng(self, "MAIN", ("Uniform testing error estimate "
+ "{:.4e}.").format(max2ErrorEst), 2)
if (np.any(np.isnan(maxErrorEst))
or np.any(np.isinf(maxErrorEst))
- or maxErrorEstOld < (np.max(maxErrorEst)
- * self.TOL_INSTABILITY)):
+ or (maxErrorEst is not None
+ and max2ErrorEstOld < (max2ErrorEst * self.TOL_INSTABILITY))):
RROMPyWarning(("Instability in a posteriori estimator. "
"Starting preemptive greedy loop "
"termination."))
self.muTest = muTestOld
self._approxParameters = (
trainedModelOld.data.approxParameters)
self._S -= 1
self.mus.pop(-1)
self.samplingEngine.popSample()
self.trainedModel.data = copy(trainedModelOld.data)
break
trainedModelOld.data = copy(self.trainedModel.data)
- if (self.interactive and np.max(maxErrorEst) <= self.greedyTol):
+ if (self.interactive and maxErrorEst is not None
+ and max2ErrorEst <= self.greedyTol):
vbMng(self, "MAIN",
("Required tolerance {} achieved. Want to decrease "
"greedyTol and continue? "
"Y/N").format(self.greedyTol), 0, end = "")
increasemaxIter = input()
if increasemaxIter.upper() == "Y":
vbMng(self, "MAIN", "Reducing value of greedyTol...",
0)
- while np.max(maxErrorEst) <= self._greedyTol:
+ while max2ErrorEst <= self._greedyTol:
self._greedyTol *= .5
if (self.interactive
and self.samplingEngine.nsamples >= self.maxIter):
vbMng(self, "MAIN",
("Maximum number of iterations {} reached. Want to "
"increase maxIter and continue? "
"Y/N").format(self.maxIter), 0, end = "")
increasemaxIter = input()
if increasemaxIter.upper() == "Y":
vbMng(self, "MAIN", "Doubling value of maxIter...", 0)
self._maxIter *= 2
vbMng(self, "DEL",
("Done computing snapshots (final snapshot count: "
"{}).").format(self.samplingEngine.nsamples), 2)
def checkComputedApprox(self) -> bool:
"""
Check if setup of new approximant is not needed.
Returns:
True if new setup is not needed. False otherwise.
"""
return (super().checkComputedApprox()
and len(self.mus) == self.trainedModel.data.projMat.shape[1])
def assembleReducedResidualGramian(self, pMat:sampList):
"""
Build residual gramian of reduced linear system through projections.
"""
self.initEstimatorNormEngine()
if (not hasattr(self.trainedModel.data, "gramian")
or self.trainedModel.data.gramian is None):
gramian = self.estimatorNormEngine.innerProduct(pMat, pMat)
else:
Sold = self.trainedModel.data.gramian.shape[0]
S = len(self.mus)
if Sold > S:
gramian = self.trainedModel.data.gramian[: S, : S]
else:
idxOld = list(range(Sold))
idxNew = list(range(Sold, S))
gramian = np.empty((S, S), dtype = np.complex)
gramian[: Sold, : Sold] = self.trainedModel.data.gramian
gramian[: Sold, Sold :] = (
self.estimatorNormEngine.innerProduct(pMat(idxNew),
pMat(idxOld)))
gramian[Sold :, : Sold] = gramian[: Sold, Sold :].T.conj()
gramian[Sold :, Sold :] = (
self.estimatorNormEngine.innerProduct(pMat(idxNew),
pMat(idxNew)))
self.trainedModel.data.gramian = gramian
def assembleReducedResidualBlocksbb(self, bs:List[Np1D]):
"""
Build blocks (of type bb) of reduced linear system through projections.
"""
self.initEstimatorNormEngine()
nbs = len(bs)
if (not hasattr(self.trainedModel.data, "resbb")
or self.trainedModel.data.resbb is None):
resbb = np.empty((nbs, nbs), dtype = np.complex)
for i in range(nbs):
Mbi = bs[i]
resbb[i, i] = self.estimatorNormEngine.innerProduct(Mbi, Mbi)
for j in range(i):
Mbj = bs[j]
resbb[i, j] = self.estimatorNormEngine.innerProduct(Mbj,
Mbi)
for i in range(nbs):
for j in range(i + 1, nbs):
resbb[i, j] = resbb[j, i].conj()
self.trainedModel.data.resbb = resbb
def assembleReducedResidualBlocksAb(self, As:List[Np2D], bs:List[Np1D],
pMat:sampList):
"""
Build blocks (of type Ab) of reduced linear system through projections.
"""
self.initEstimatorNormEngine()
nAs = len(As)
nbs = len(bs)
S = len(self.mus)
if (not hasattr(self.trainedModel.data, "resAb")
or self.trainedModel.data.resAb is None):
if not isinstance(pMat, (np.ndarray,)): pMat = pMat.data
resAb = np.empty((nbs, S, nAs), dtype = np.complex)
for j in range(nAs):
MAj = dot(As[j], pMat)
for i in range(nbs):
Mbi = bs[i]
resAb[i, :, j] = self.estimatorNormEngine.innerProduct(MAj,
Mbi)
else:
Sold = self.trainedModel.data.resAb.shape[1]
if Sold == S: return
if Sold > S:
resAb = self.trainedModel.data.resAb[:, : S, :]
else:
if not isinstance(pMat, (np.ndarray,)): pMat = pMat.data
resAb = np.empty((nbs, S, nAs), dtype = np.complex)
resAb[:, : Sold, :] = self.trainedModel.data.resAb
for j in range(nAs):
MAj = dot(As[j], pMat[:, Sold :])
for i in range(nbs):
Mbi = bs[i]
resAb[i, Sold :, j] = (
self.estimatorNormEngine.innerProduct(MAj, Mbi))
self.trainedModel.data.resAb = resAb
def assembleReducedResidualBlocksAA(self, As:List[Np2D], pMat:sampList):
"""
Build blocks (of type AA) of reduced linear system through projections.
"""
self.initEstimatorNormEngine()
nAs = len(As)
S = len(self.mus)
if (not hasattr(self.trainedModel.data, "resAA")
or self.trainedModel.data.resAA is None):
if not isinstance(pMat, (np.ndarray,)): pMat = pMat.data
resAA = np.empty((S, nAs, S, nAs), dtype = np.complex)
for i in range(nAs):
MAi = dot(As[i], pMat)
resAA[:, i, :, i] = (
self.estimatorNormEngine.innerProduct(MAi, MAi))
for j in range(i):
MAj = dot(As[j], pMat)
resAA[:, i, :, j] = (
self.estimatorNormEngine.innerProduct(MAj, MAi))
for i in range(nAs):
for j in range(i + 1, nAs):
resAA[:, i, :, j] = resAA[:, j, :, i].T.conj()
else:
Sold = self.trainedModel.data.resAA.shape[0]
if Sold == S: return
if Sold > S:
resAA = self.trainedModel.data.resAA[: S, :, : S, :]
else:
if not isinstance(pMat, (np.ndarray,)): pMat = pMat.data
resAA = np.empty((S, nAs, S, nAs), dtype = np.complex)
resAA[: Sold, :, : Sold, :] = self.trainedModel.data.resAA
for i in range(nAs):
MAi = dot(As[i], pMat)
resAA[: Sold, i, Sold :, i] = (
self.estimatorNormEngine.innerProduct(MAi[:, Sold :],
MAi[:, : Sold]))
resAA[Sold :, i, : Sold, i] = resAA[: Sold, i,
Sold :, i].T.conj()
resAA[Sold :, i, Sold :, i] = (
self.estimatorNormEngine.innerProduct(MAi[:, Sold :],
MAi[:, Sold :]))
for j in range(i):
MAj = dot(As[j], pMat)
resAA[: Sold, i, Sold :, j] = (
self.estimatorNormEngine.innerProduct(MAj[:, Sold :],
MAi[:, : Sold]))
resAA[Sold :, i, : Sold, j] = (
self.estimatorNormEngine.innerProduct(MAj[:, : Sold],
MAi[:, Sold :]))
resAA[Sold :, i, Sold :, j] = (
self.estimatorNormEngine.innerProduct(MAj[:, Sold :],
MAi[:, Sold :]))
for i in range(nAs):
for j in range(i + 1, nAs):
resAA[: Sold, i, Sold :, j] = (
resAA[Sold :, j, : Sold, i].T.conj())
resAA[Sold :, i, : Sold, j] = (
resAA[: Sold, j, Sold :, i].T.conj())
resAA[Sold :, i, Sold :, j] = (
resAA[Sold :, j, Sold :, i].T.conj())
self.trainedModel.data.resAA = resAA
def assembleReducedResidualBlocks(self, full : bool = False):
"""Build affine blocks of affine decomposition of residual."""
- if hasattr(self.HFEngine, "buildb"): self.HFEngine.buildb()
+ if full:
+ checkIfAffine(self.HFEngine, "assemble reduced residual blocks")
+ else:
+ checkIfAffine(self.HFEngine, "assemble reduced RHS blocks", True)
+ self.HFEngine.buildb()
self.assembleReducedResidualBlocksbb(self.HFEngine.bs)
if full:
pMat = self.samplingEngine.samples
- if hasattr(self.HFEngine, "buildA"): self.HFEngine.buildA()
+ self.HFEngine.buildA()
self.assembleReducedResidualBlocksAb(self.HFEngine.As,
self.HFEngine.bs, pMat)
self.assembleReducedResidualBlocksAA(self.HFEngine.As, pMat)
diff --git a/rrompy/reduction_methods/greedy/reduced_basis_greedy.py b/rrompy/reduction_methods/greedy/reduced_basis_greedy.py
index b29eb32..4d67e8a 100644
--- a/rrompy/reduction_methods/greedy/reduced_basis_greedy.py
+++ b/rrompy/reduction_methods/greedy/reduced_basis_greedy.py
@@ -1,185 +1,184 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
from copy import deepcopy as copy
from .generic_greedy_approximant import GenericGreedyApproximant
from rrompy.reduction_methods.standard import ReducedBasis
from rrompy.utilities.base.types import DictAny, HFEng, paramVal
from rrompy.utilities.base import verbosityManager as vbMng
from rrompy.utilities.numerical import dot
from rrompy.utilities.exception_manager import RROMPyWarning, RROMPyAssert
__all__ = ['ReducedBasisGreedy']
class ReducedBasisGreedy(GenericGreedyApproximant, ReducedBasis):
"""
ROM greedy RB approximant computation for parametric problems.
Args:
HFEngine: HF problem solver.
mu0(optional): Default parameter. Defaults to 0.
approxParameters(optional): Dictionary containing values for main
parameters of approximant. Recognized keys are:
- 'POD': whether to compute POD of snapshots; defaults to True;
- 'S': number of starting training points;
- 'sampler': sample point generator;
- 'greedyTol': uniform error tolerance for greedy algorithm;
defaults to 1e-2;
- 'collinearityTol': collinearity tolerance for greedy algorithm;
defaults to 0.;
- 'interactive': whether to interactively terminate greedy
algorithm; defaults to False;
- 'maxIter': maximum number of greedy steps; defaults to 1e2;
- 'refinementRatio': ratio of test points to be exhausted before
test set refinement; defaults to 0.2;
- 'nTestPoints': number of test points; defaults to 5e2;
- 'trainSetGenerator': training sample points generator; defaults
to sampler.
Defaults to empty dict.
approx_state(optional): Whether to approximate state. Defaults and must
be True.
verbosity(optional): Verbosity level. Defaults to 10.
Attributes:
HFEngine: HF problem solver.
mu0: Default parameter.
mus: Array of snapshot parameters.
approxParameters: Dictionary containing values for main parameters of
approximant. Recognized keys are in parameterList.
parameterListSoft: Recognized keys of soft approximant parameters:
- 'POD': whether to compute POD of snapshots.
- 'greedyTol': uniform error tolerance for greedy algorithm;
- 'collinearityTol': collinearity tolerance for greedy algorithm;
- 'interactive': whether to interactively terminate greedy
algorithm;
- 'maxIter': maximum number of greedy steps;
- 'refinementRatio': ratio of test points to be exhausted before
test set refinement;
- 'nTestPoints': number of test points;
- 'trainSetGenerator': training sample points generator.
parameterListCritical: Recognized keys of critical approximant
parameters:
- 'S': total number of samples current approximant relies upon;
- 'sampler': sample point generator.
approx_state: Whether to approximate state.
verbosity: Verbosity level.
POD: whether to compute POD of snapshots.
S: number of test points.
sampler: Sample point generator.
greedyTol: uniform error tolerance for greedy algorithm.
collinearityTol: Collinearity tolerance for greedy algorithm.
interactive: whether to interactively terminate greedy algorithm.
maxIter: maximum number of greedy steps.
refinementRatio: ratio of training points to be exhausted before
training set refinement.
nTestPoints: number of starting training points.
trainSetGenerator: training sample points generator.
muBounds: list of bounds for parameter values.
samplingEngine: Sampling engine.
estimatorNormEngine: Engine for estimator norm computation.
uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as
sampleList.
lastSolvedHF: Parameter(s) corresponding to last computed high fidelity
solution(s) as parameterList.
uApproxReduced: Reduced approximate solution(s) with parameter(s)
lastSolvedApprox as sampleList.
lastSolvedApproxReduced: Parameter(s) corresponding to last computed
reduced approximate solution(s) as parameterList.
uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as
sampleList.
lastSolvedApprox: Parameter(s) corresponding to last computed
approximate solution(s) as parameterList.
As: List of sparse matrices (in CSC format) representing coefficients
of linear system matrix.
bs: List of numpy vectors representing coefficients of linear system
RHS.
ARBs: List of sparse matrices (in CSC format) representing coefficients
of compressed linear system matrix.
bRBs: List of numpy vectors representing coefficients of compressed
linear system RHS.
"""
def __init__(self, HFEngine:HFEng, mu0 : paramVal = None,
approxParameters : DictAny = {}, approx_state : bool = True,
verbosity : int = 10, timestamp : bool = True):
if not approx_state: RROMPyWarning("Overriding approx_state to True.")
self._preInit()
self._addParametersToList([], [], toBeExcluded = ["R", "PODTolerance"])
super().__init__(HFEngine = HFEngine, mu0 = mu0,
approxParameters = approxParameters,
approx_state = True, verbosity = verbosity,
timestamp = timestamp)
self._postInit()
@property
def R(self):
"""Value of R."""
self._R = self._S
return self._R
@R.setter
def R(self, R):
RROMPyWarning(("R is used just to simplify inheritance, and its value "
"cannot be changed from that of S."))
@property
def PODTolerance(self):
"""Value of PODTolerance."""
self._PODTolerance = -1
return self._PODTolerance
@PODTolerance.setter
def PODTolerance(self, PODTolerance):
RROMPyWarning(("PODTolerance is used just to simplify inheritance, "
"and its value cannot be changed from -1."))
def setupApprox(self, plotEst : bool = False):
"""Compute RB projection matrix."""
if self.checkComputedApprox():
return
RROMPyAssert(self._mode, message = "Cannot setup approximant.")
vbMng(self, "INIT", "Setting up {}.". format(self.name()), 5)
self.greedy(plotEst)
vbMng(self, "INIT", "Computing projection matrix.", 7)
pMat = self.samplingEngine.samples.data
pMatEff = dot(self.HFEngine.C, pMat)
if self.trainedModel is None:
self.trainedModel = self.tModelType()
self.trainedModel.verbosity = self.verbosity
self.trainedModel.timestamp = self.timestamp
datadict = {"mu0": self.mu0, "projMat": pMatEff,
"scaleFactor": self.scaleFactor,
"rescalingExp": self.HFEngine.rescalingExp}
data = self.initializeModelData(datadict)[0]
ARBs, bRBs = self.assembleReducedSystem(pMat)
data.affinePoly = self.HFEngine.affinePoly
- if hasattr(self.HFEngine, "buildA"): self.HFEngine.buildA()
- if hasattr(self.HFEngine, "buildb"): self.HFEngine.buildb()
+ self.HFEngine.buildA()
+ self.HFEngine.buildb()
data.thAs, data.thbs = self.HFEngine.thAs, self.HFEngine.thbs
self.trainedModel.data = data
else:
self.trainedModel = self.trainedModel
Sold = self.trainedModel.data.projMat.shape[1]
ARBs, bRBs = self.assembleReducedSystem(pMat[:, Sold :],
pMat[:, : Sold])
self.trainedModel.data.projMat = copy(pMatEff)
self.trainedModel.data.mus = copy(self.mus)
self.trainedModel.data.ARBs = ARBs
self.trainedModel.data.bRBs = bRBs
vbMng(self, "DEL", "Done computing projection matrix.", 7)
self.trainedModel.data.approxParameters = copy(self.approxParameters)
vbMng(self, "DEL", "Done setting up approximant.", 5)
-
diff --git a/rrompy/reduction_methods/pivoted/generic_pivoted_approximant.py b/rrompy/reduction_methods/pivoted/generic_pivoted_approximant.py
index ce7e8c4..37bb2e8 100644
--- a/rrompy/reduction_methods/pivoted/generic_pivoted_approximant.py
+++ b/rrompy/reduction_methods/pivoted/generic_pivoted_approximant.py
@@ -1,539 +1,537 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
import numpy as np
from copy import deepcopy as copy
from rrompy.reduction_methods.base.generic_approximant import (
GenericApproximant)
from rrompy.utilities.poly_fitting.polynomial import (polybases as ppb,
PolynomialInterpolator as PI)
from rrompy.utilities.poly_fitting.radial_basis import (polybases as rbpb,
RadialBasisInterpolator as RBI)
from rrompy.utilities.poly_fitting.moving_least_squares import (
polybases as mlspb,
MovingLeastSquaresInterpolator as MLSI)
from rrompy.sampling.pivoted import (SamplingEnginePivoted,
SamplingEnginePivotedPOD)
from rrompy.utilities.base.types import ListAny, DictAny, HFEng, paramVal
from rrompy.utilities.base import verbosityManager as vbMng
from rrompy.utilities.numerical import (fullDegreeN, totalDegreeN,
nextDerivativeIndices)
from rrompy.utilities.exception_manager import (RROMPyException, RROMPyAssert,
RROMPyWarning)
from rrompy.parameter import emptyParameterList
__all__ = ['GenericPivotedApproximant']
class GenericPivotedApproximant(GenericApproximant):
"""
ROM pivoted approximant (with pole matching) computation for parametric
problems (ABSTRACT).
Args:
HFEngine: HF problem solver.
mu0(optional): Default parameter. Defaults to 0.
directionPivot(optional): Pivot components. Defaults to [0].
approxParameters(optional): Dictionary containing values for main
parameters of approximant. Recognized keys are:
- 'POD': whether to compute POD of snapshots; defaults to True;
- 'matchingWeight': weight for pole matching optimization; defaults
to 1;
- 'cutOffTolerance': tolerance for ignoring parasitic poles;
defaults to np.inf;
- 'cutOffType': rule for tolerance computation for parasitic poles;
defaults to 'MAGNITUDE';
- 'S': total number of pivot samples current approximant relies
upon;
- 'samplerPivot': pivot sample point generator;
- 'SMarginal': total number of marginal samples current approximant
relies upon;
- 'samplerMarginal': marginal sample point generator;
- 'polybasisMarginal': type of polynomial basis for marginal
interpolation; allowed values include 'MONOMIAL', 'CHEBYSHEV'
and 'LEGENDRE'; defaults to 'MONOMIAL';
- 'MMarginal': degree of marginal interpolant; defaults to 0;
- 'polydegreetypeMarginal': type of polynomial degree for marginal;
defaults to 'TOTAL';
- 'radialDirectionalWeightsMarginal': radial basis weights for
marginal interpolant; defaults to 0, i.e. identity;
- 'nNearestNeighborMarginal': number of marginal nearest neighbors
considered if polybasisMarginal allows; defaults to -1;
- 'interpRcondMarginal': tolerance for marginal interpolation;
defaults to None.
Defaults to empty dict.
approx_state(optional): Whether to approximate state. Defaults to
False.
verbosity(optional): Verbosity level. Defaults to 10.
Attributes:
HFEngine: HF problem solver.
mu0: Default parameter.
directionPivot: Pivot components.
mus: Array of snapshot parameters.
musPivot: Array of pivot snapshot parameters.
musMarginal: Array of marginal snapshot parameters.
approxParameters: Dictionary containing values for main parameters of
approximant. Recognized keys are in parameterList.
parameterListSoft: Recognized keys of soft approximant parameters:
- 'POD': whether to compute POD of snapshots;
- 'matchingWeight': weight for pole matching optimization;
- 'cutOffTolerance': tolerance for ignoring parasitic poles;
- 'cutOffType': rule for tolerance computation for parasitic poles;
- 'polybasisMarginal': type of polynomial basis for marginal
interpolation;
- 'MMarginal': degree of marginal interpolant;
- 'polydegreetypeMarginal': type of polynomial degree for marginal;
- 'radialDirectionalWeightsMarginal': radial basis weights for
marginal interpolant;
- 'nNearestNeighborMarginal': number of marginal nearest neighbors
considered if polybasisMarginal allows;
- 'interpRcondMarginal': tolerance for marginal interpolation.
parameterListCritical: Recognized keys of critical approximant
parameters:
- 'S': total number of pivot samples current approximant relies
upon;
- 'samplerPivot': pivot sample point generator;
- 'SMarginal': total number of marginal samples current approximant
relies upon;
- 'samplerMarginal': marginal sample point generator.
approx_state: Whether to approximate state.
verbosity: Verbosity level.
POD: Whether to compute POD of snapshots.
matchingWeight: Weight for pole matching optimization.
cutOffTolerance: Tolerance for ignoring parasitic poles.
cutOffType: Rule for tolerance computation for parasitic poles.
S: Total number of pivot samples current approximant relies upon.
samplerPivot: Pivot sample point generator.
SMarginal: Total number of marginal samples current approximant relies
upon.
samplerMarginal: Marginal sample point generator.
polybasisMarginal: Type of polynomial basis for marginal interpolation.
MMarginal: Degree of marginal interpolant.
polydegreetypeMarginal: Type of polynomial degree for marginal.
radialDirectionalWeightsMarginal: Radial basis weights for marginal
interpolant.
nNearestNeighborMarginal: Number of marginal nearest neighbors
considered if polybasisMarginal allows.
interpRcondMarginal: Tolerance for marginal interpolation.
muBoundsPivot: list of bounds for pivot parameter values.
muBoundsMarginal: list of bounds for marginal parameter values.
samplingEngine: Sampling engine.
uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as
sampleList.
lastSolvedHF: Parameter(s) corresponding to last computed high fidelity
solution(s) as parameterList.
uApproxReduced: Reduced approximate solution(s) with parameter(s)
lastSolvedApprox as sampleList.
lastSolvedApproxReduced: Parameter(s) corresponding to last computed
reduced approximate solution(s) as parameterList.
uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as
sampleList.
lastSolvedApprox: Parameter(s) corresponding to last computed
approximate solution(s) as parameterList.
"""
def __init__(self, HFEngine:HFEng, mu0 : paramVal = None,
directionPivot : ListAny = [0],
approxParameters : DictAny = {}, approx_state : bool = False,
verbosity : int = 10, timestamp : bool = True):
self._preInit()
if len(directionPivot) > 1:
raise RROMPyException(("Exactly 1 pivot parameter allowed in pole "
"matching."))
from rrompy.parameter.parameter_sampling import QuadratureSampler as QS
QSBase = QS([[0], [1]], "UNIFORM")
self._addParametersToList(["matchingWeight", "cutOffTolerance",
"cutOffType", "polybasisMarginal",
"MMarginal", "polydegreetypeMarginal",
"radialDirectionalWeightsMarginal",
"nNearestNeighborMarginal",
"interpRcondMarginal"],
[1, np.inf, "MAGNITUDE", "MONOMIAL", 0,
"TOTAL", 1, -1, -1],
["samplerPivot", "SMarginal",
"samplerMarginal"], [QSBase, [1], QSBase])
del QS
self._directionPivot = directionPivot
super().__init__(HFEngine = HFEngine, mu0 = mu0,
approxParameters = approxParameters,
approx_state = approx_state, verbosity = verbosity,
timestamp = timestamp)
self._postInit()
def setupSampling(self):
"""Setup sampling engine."""
RROMPyAssert(self._mode, message = "Cannot setup sampling engine.")
if not hasattr(self, "_POD") or self._POD is None: return
if self.POD:
SamplingEngine = SamplingEnginePivotedPOD
else:
SamplingEngine = SamplingEnginePivoted
self.samplingEngine = SamplingEngine(self.HFEngine,
self.directionPivot,
sample_state = self.approx_state,
verbosity = self.verbosity)
@property
def matchingWeight(self):
"""Value of matchingWeight."""
return self._matchingWeight
@matchingWeight.setter
def matchingWeight(self, matchingWeight):
self._matchingWeight = matchingWeight
self._approxParameters["matchingWeight"] = self.matchingWeight
@property
def cutOffTolerance(self):
"""Value of cutOffTolerance."""
return self._cutOffTolerance
@cutOffTolerance.setter
def cutOffTolerance(self, cutOffTolerance):
self._cutOffTolerance = cutOffTolerance
self._approxParameters["cutOffTolerance"] = self.cutOffTolerance
@property
def cutOffType(self):
"""Value of cutOffType."""
return self._cutOffType
@cutOffType.setter
def cutOffType(self, cutOffType):
try:
cutOffType = cutOffType.upper().strip().replace(" ","")
if cutOffType not in ["MAGNITUDE", "POTENTIAL"]:
raise RROMPyException("Prescribed cutOffType not recognized.")
self._cutOffType = cutOffType
except:
RROMPyWarning(("Prescribed cutOffType not recognized. Overriding "
"to 'MAGNITUDE'."))
self._cutOffType = "MAGNITUDE"
self._approxParameters["cutOffType"] = self.cutOffType
@property
def SMarginal(self):
"""Value of SMarginal."""
return self._SMarginal
@SMarginal.setter
def SMarginal(self, SMarginal):
if SMarginal <= 0:
raise RROMPyException("SMarginal must be positive.")
if hasattr(self, "_SMarginal") and self._SMarginal is not None:
Sold = self.SMarginal
else: Sold = -1
self._SMarginal = SMarginal
self._approxParameters["SMarginal"] = self.SMarginal
if Sold != self.SMarginal: self.resetSamples()
@property
def polybasisMarginal(self):
"""Value of polybasisMarginal."""
return self._polybasisMarginal
@polybasisMarginal.setter
def polybasisMarginal(self, polybasisMarginal):
try:
polybasisMarginal = polybasisMarginal.upper().strip().replace(" ",
"")
if polybasisMarginal not in ppb + rbpb + mlspb:
raise RROMPyException(
"Prescribed marginal polybasis not recognized.")
self._polybasisMarginal = polybasisMarginal
except:
RROMPyWarning(("Prescribed marginal polybasis not recognized. "
"Overriding to 'MONOMIAL'."))
self._polybasisMarginal = "MONOMIAL"
self._approxParameters["polybasisMarginal"] = self.polybasisMarginal
@property
def MMarginal(self):
"""Value of MMarginal."""
return self._MMarginal
@MMarginal.setter
def MMarginal(self, MMarginal):
if MMarginal < 0:
raise RROMPyException("MMarginal must be non-negative.")
self._MMarginal = MMarginal
self._approxParameters["MMarginal"] = self.MMarginal
@property
def polydegreetypeMarginal(self):
"""Value of polydegreetypeMarginal."""
return self._polydegreetypeMarginal
@polydegreetypeMarginal.setter
def polydegreetypeMarginal(self, polydegreetypeM):
try:
polydegreetypeM = polydegreetypeM.upper().strip().replace(" ","")
if polydegreetypeM not in ["TOTAL", "FULL"]:
raise RROMPyException(("Prescribed polydegreetypeMarginal not "
"recognized."))
self._polydegreetypeMarginal = polydegreetypeM
except:
RROMPyWarning(("Prescribed polydegreetypeMarginal not recognized. "
"Overriding to 'TOTAL'."))
self._polydegreetypeMarginal = "TOTAL"
self._approxParameters["polydegreetypeMarginal"] = (
self.polydegreetypeMarginal)
@property
def radialDirectionalWeightsMarginal(self):
"""Value of radialDirectionalWeightsMarginal."""
return self._radialDirectionalWeightsMarginal
@radialDirectionalWeightsMarginal.setter
def radialDirectionalWeightsMarginal(self, radialDirWeightsMarginal):
self._radialDirectionalWeightsMarginal = radialDirWeightsMarginal
self._approxParameters["radialDirectionalWeightsMarginal"] = (
self.radialDirectionalWeightsMarginal)
@property
def nNearestNeighborMarginal(self):
"""Value of nNearestNeighborMarginal."""
return self._nNearestNeighborMarginal
@nNearestNeighborMarginal.setter
def nNearestNeighborMarginal(self, nNearestNeighborMarginal):
self._nNearestNeighborMarginal = nNearestNeighborMarginal
self._approxParameters["nNearestNeighborMarginal"] = (
self.nNearestNeighborMarginal)
@property
def interpRcondMarginal(self):
"""Value of interpRcondMarginal."""
return self._interpRcondMarginal
@interpRcondMarginal.setter
def interpRcondMarginal(self, interpRcondMarginal):
self._interpRcondMarginal = interpRcondMarginal
self._approxParameters["interpRcondMarginal"] = (
self.interpRcondMarginal)
@property
def directionPivot(self):
"""Value of directionPivot. Its assignment may reset snapshots."""
return self._directionPivot
@directionPivot.setter
def directionPivot(self, directionPivot):
if hasattr(self, '_directionPivot'):
directionPivotOld = copy(self.directionPivot)
else:
directionPivotOld = None
if (directionPivotOld is None
or len(directionPivot) != len(directionPivotOld)
or not directionPivot == directionPivotOld):
self.resetSamples()
self._directionPivot = directionPivot
@property
def directionMarginal(self):
return [x for x in range(self.HFEngine.npar) \
if x not in self.directionPivot]
@property
def nparPivot(self):
return len(self.directionPivot)
@property
def nparMarginal(self):
return self.npar - self.nparPivot
@property
def rescalingExpPivot(self):
return [self.HFEngine.rescalingExp[x] for x in self.directionPivot]
@property
def rescalingExpMarginal(self):
return [self.HFEngine.rescalingExp[x] for x in self.directionMarginal]
@property
def muBoundsPivot(self):
"""Value of muBoundsPivot."""
return self.samplerPivot.lims
@property
def muBoundsMarginal(self):
"""Value of muBoundsMarginal."""
return self.samplerMarginal.lims
@property
def samplerPivot(self):
"""Value of samplerPivot."""
return self._samplerPivot
@samplerPivot.setter
def samplerPivot(self, samplerPivot):
if 'generatePoints' not in dir(samplerPivot):
raise RROMPyException("Pivot sampler type not recognized.")
if hasattr(self, '_samplerPivot') and self._samplerPivot is not None:
samplerOld = self.samplerPivot
self._samplerPivot = samplerPivot
self._approxParameters["samplerPivot"] = self.samplerPivot.__str__()
if not 'samplerOld' in locals() or samplerOld != self.samplerPivot:
self.resetSamples()
@property
def samplerMarginal(self):
"""Value of samplerMarginal."""
return self._samplerMarginal
@samplerMarginal.setter
def samplerMarginal(self, samplerMarginal):
if 'generatePoints' not in dir(samplerMarginal):
raise RROMPyException("Marginal sampler type not recognized.")
if (hasattr(self, '_samplerMarginal')
and self._samplerMarginal is not None):
samplerOld = self.samplerMarginal
self._samplerMarginal = samplerMarginal
self._approxParameters["samplerMarginal"] = (
self.samplerMarginal.__str__())
if not 'samplerOld' in locals() or samplerOld != self.samplerMarginal:
self.resetSamples()
def resetSamples(self):
"""Reset samples."""
super().resetSamples()
self._musMUniqueCN = None
self._derMIdxs = None
self._reorderM = None
def setSamples(self, samplingEngine):
"""Copy samplingEngine and samples."""
super().setSamples(samplingEngine)
self.mus = copy(self.samplingEngine[0].mus)
for sEj in self.samplingEngine[1:]:
self.mus.append(sEj.mus)
def computeSnapshots(self):
"""Compute snapshots of solution map."""
RROMPyAssert(self._mode,
message = "Cannot start snapshot computation.")
if self.samplingEngine.nsamplesTot != self.S * self.SMarginal:
self.computeScaleFactor()
self.resetSamples()
vbMng(self, "INIT", "Starting computation of snapshots.", 5)
- if hasattr(self.HFEngine, "buildA"): self.HFEngine.buildA()
- if hasattr(self.HFEngine, "buildb"): self.HFEngine.buildb()
self.musPivot = self.samplerPivot.generatePoints(self.S)
self.musMarginal = self.samplerMarginal.generatePoints(
self.SMarginal)
self.mus = emptyParameterList()
self.mus.reset((self.S * self.SMarginal, self.HFEngine.npar))
self.samplingEngine.resetHistory(self.SMarginal)
for j, muMarg in enumerate(self.musMarginal):
for k in range(j * self.S, (j + 1) * self.S):
self.mus.data[k, self.directionPivot] = (
self.musPivot[k - j * self.S].data)
self.mus.data[k, self.directionMarginal] = muMarg.data
self.samplingEngine.iterSample(self.musPivot, self.musMarginal)
if self.POD:
self.samplingEngine.coalesceSamples(self.interpRcondMarginal)
else:
self.samplingEngine.coalesceSamples()
vbMng(self, "DEL", "Done computing snapshots.", 5)
def _setupMarginalInterpolationIndices(self):
"""Setup parameters for polyvander."""
RROMPyAssert(self._mode,
message = "Cannot setup interpolation indices.")
if (self._musMUniqueCN is None
or len(self._reorderM) != len(self.musMarginal)):
self._musMUniqueCN, musMIdxsTo, musMIdxs, musMCount = (
self.trainedModel.centerNormalizeMarginal(self.musMarginal)\
.unique(return_index = True, return_inverse = True,
return_counts = True))
self._musMUnique = self.musMarginal[musMIdxsTo]
self._derMIdxs = [None] * len(self._musMUniqueCN)
self._reorderM = np.empty(len(musMIdxs), dtype = int)
filled = 0
for j, cnt in enumerate(musMCount):
self._derMIdxs[j] = nextDerivativeIndices([],
self.nparMarginal, cnt)
jIdx = np.nonzero(musMIdxs == j)[0]
self._reorderM[jIdx] = np.arange(filled, filled + cnt)
filled += cnt
def _setupMarginalInterp(self):
"""Compute marginal interpolator."""
RROMPyAssert(self._mode, message = "Cannot setup numerator.")
vbMng(self, "INIT", "Starting computation of marginal interpolator.",
7)
self._setupMarginalInterpolationIndices()
if self.polydegreetypeMarginal == "TOTAL":
cfun = totalDegreeN
else:
cfun = fullDegreeN
MM = copy(self.MMarginal)
while len(self.musMarginal) < cfun(MM, self.nparMarginal): MM -= 1
if MM < self.MMarginal:
RROMPyWarning(
("MMarginal too large compared to SMarginal. "
"Reducing MMarginal by {}").format(self.MMarginal - MM))
self.MMarginal = MM
mI = []
for j in range(len(self.musMarginal)):
canonicalj = 1. * (np.arange(len(self.musMarginal)) == j)
self._MMarginal = MM
while self.MMarginal >= 0:
if self.polybasisMarginal in ppb:
p = PI()
wellCond, msg = p.setupByInterpolation(
self._musMUniqueCN, canonicalj, self.MMarginal,
self.polybasisMarginal, self.verbosity >= 5,
self.polydegreetypeMarginal == "TOTAL",
{"derIdxs": self._derMIdxs,
"reorder": self._reorderM,
"scl": np.power(self.scaleFactorMarginal, -1.)},
{"rcond": self.interpRcondMarginal})
elif self.polybasisMarginal in rbpb:
p = RBI()
wellCond, msg = p.setupByInterpolation(
self._musMUniqueCN, canonicalj, self.MMarginal,
self.polybasisMarginal,
self.radialDirectionalWeightsMarginal,
self.verbosity >= 5,
self.polydegreetypeMarginal == "TOTAL",
{"derIdxs": self._derMIdxs,
"reorder": self._reorderM,
"scl": np.power(self.scaleFactorMarginal, -1.),
"nNearestNeighbor" : self.nNearestNeighborMarginal},
{"rcond": self.interpRcondMarginal})
else:# if self.polybasisMarginal in mlspb:
p = MLSI()
wellCond, msg = p.setupByInterpolation(
self._musMUniqueCN, canonicalj, self.MMarginal,
self.polybasisMarginal,
self.radialDirectionalWeightsMarginal,
self.verbosity >= 5,
self.polydegreetypeMarginal == "TOTAL",
{"derIdxs": self._derMIdxs,
"reorder": self._reorderM,
"scl": np.power(self.scaleFactorMarginal, -1.),
"nNearestNeighbor" : self.nNearestNeighborMarginal})
vbMng(self, "MAIN", msg, 5)
if wellCond: break
RROMPyWarning(("Polyfit is poorly conditioned. Reducing "
"MMarginal by 1."))
self.MMarginal = self.MMarginal - 1
mI = mI + [copy(p)]
vbMng(self, "DEL", "Done computing marginal interpolator.", 7)
return mI
def computeScaleFactor(self):
"""Compute parameter rescaling factor."""
RROMPyAssert(self._mode, message = "Cannot compute rescaling factor.")
self.scaleFactorPivot = .5 * np.abs(
self.muBoundsPivot[0] ** self.rescalingExpPivot
- self.muBoundsPivot[1] ** self.rescalingExpPivot)
self.scaleFactorMarginal = .5 * np.abs(
self.muBoundsMarginal[0] ** self.rescalingExpMarginal
- self.muBoundsMarginal[1] ** self.rescalingExpMarginal)
self.scaleFactor = np.empty(self.npar)
self.scaleFactor[self.directionPivot] = self.scaleFactorPivot
self.scaleFactor[self.directionMarginal] = self.scaleFactorMarginal