diff --git a/rrompy/reduction_methods/greedy/generic_greedy_approximant.py b/rrompy/reduction_methods/greedy/generic_greedy_approximant.py
index 16c10de..5463780 100644
--- a/rrompy/reduction_methods/greedy/generic_greedy_approximant.py
+++ b/rrompy/reduction_methods/greedy/generic_greedy_approximant.py
@@ -1,705 +1,669 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
from copy import deepcopy as copy
import numpy as np
from rrompy.hfengines.base.linear_affine_engine import checkIfAffine
from rrompy.reduction_methods.standard.generic_standard_approximant \
import GenericStandardApproximant
from rrompy.utilities.base.types import (Np1D, Np2D, DictAny, HFEng, Tuple,
List, normEng, paramVal, paramList,
sampList)
from rrompy.utilities.base import verbosityManager as vbMng
from rrompy.utilities.numerical import dot
from rrompy.utilities.expression import expressionEvaluator
from rrompy.solver import normEngine
from rrompy.utilities.exception_manager import (RROMPyException, RROMPyAssert,
RROMPyWarning)
from rrompy.parameter import checkParameterList, emptyParameterList
__all__ = ['GenericGreedyApproximant']
def localL2Distance(mus:Np2D, badmus:Np2D) -> Np2D:
return np.linalg.norm(np.tile(mus[..., np.newaxis], [1, 1, len(badmus)])
- badmus[..., np.newaxis].T, axis = 1)
def pruneSamples(mus:paramList, badmus:paramList,
tol : float = 1e-8) -> Np1D:
"""Remove from mus all the elements which are too close to badmus."""
if len(badmus) == 0: return mus
proximity = np.min(localL2Distance(mus.data, badmus.data), axis = 1)
return np.arange(len(mus))[proximity <= tol]
class GenericGreedyApproximant(GenericStandardApproximant):
"""
ROM greedy interpolant computation for parametric problems
(ABSTRACT).
Args:
HFEngine: HF problem solver.
mu0(optional): Default parameter. Defaults to 0.
approxParameters(optional): Dictionary containing values for main
parameters of approximant. Recognized keys are:
- 'POD': whether to compute POD of snapshots; defaults to True;
- 'S': number of starting training points;
- 'sampler': sample point generator;
- 'greedyTol': uniform error tolerance for greedy algorithm;
defaults to 1e-2;
- 'collinearityTol': collinearity tolerance for greedy algorithm;
defaults to 0.;
- - 'interactive': whether to interactively terminate greedy
- algorithm; defaults to False;
- 'maxIter': maximum number of greedy steps; defaults to 1e2;
- 'refinementRatio': ratio of test points to be exhausted before
test set refinement; defaults to 0.2;
- 'nTestPoints': number of test points; defaults to 5e2;
- 'trainSetGenerator': training sample points generator; defaults
to sampler.
Defaults to empty dict.
approx_state(optional): Whether to approximate state. Defaults to
False.
verbosity(optional): Verbosity level. Defaults to 10.
Attributes:
HFEngine: HF problem solver.
mu0: Default parameter.
mus: Array of snapshot parameters.
approxParameters: Dictionary containing values for main parameters of
approximant. Recognized keys are in parameterList.
parameterListSoft: Recognized keys of soft approximant parameters:
- 'POD': whether to compute POD of snapshots.
- 'greedyTol': uniform error tolerance for greedy algorithm;
- 'collinearityTol': collinearity tolerance for greedy algorithm;
- - 'interactive': whether to interactively terminate greedy
- algorithm;
- 'maxIter': maximum number of greedy steps;
- 'refinementRatio': ratio of test points to be exhausted before
test set refinement;
- 'nTestPoints': number of test points;
- 'trainSetGenerator': training sample points generator.
parameterListCritical: Recognized keys of critical approximant
parameters:
- 'S': total number of samples current approximant relies upon;
- 'sampler': sample point generator.
approx_state: Whether to approximate state.
verbosity: Verbosity level.
POD: whether to compute POD of snapshots.
S: number of test points.
sampler: Sample point generator.
greedyTol: Uniform error tolerance for greedy algorithm.
collinearityTol: Collinearity tolerance for greedy algorithm.
- interactive: whether to interactively terminate greedy algorithm.
maxIter: maximum number of greedy steps.
refinementRatio: ratio of training points to be exhausted before
training set refinement.
nTestPoints: number of starting training points.
trainSetGenerator: training sample points generator.
muBounds: list of bounds for parameter values.
samplingEngine: Sampling engine.
estimatorNormEngine: Engine for estimator norm computation.
uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as
sampleList.
lastSolvedHF: Parameter(s) corresponding to last computed high fidelity
solution(s) as parameterList.
uApproxReduced: Reduced approximate solution(s) with parameter(s)
lastSolvedApprox as sampleList.
lastSolvedApproxReduced: Parameter(s) corresponding to last computed
reduced approximate solution(s) as parameterList.
uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as
sampleList.
lastSolvedApprox: Parameter(s) corresponding to last computed
approximate solution(s) as parameterList.
"""
def __init__(self, HFEngine:HFEng, mu0 : paramVal = None,
approxParameters : DictAny = {}, approx_state : bool = False,
verbosity : int = 10, timestamp : bool = True):
self._preInit()
- self._addParametersToList(["greedyTol", "collinearityTol",
- "interactive", "maxIter", "refinementRatio",
- "nTestPoints"],
- [1e-2, 0., False, 1e2, .2, 5e2],
+ self._addParametersToList(["greedyTol", "collinearityTol", "maxIter",
+ "refinementRatio", "nTestPoints"],
+ [1e-2, 0., 1e2, .2, 5e2],
["trainSetGenerator"], ["AUTO"])
super().__init__(HFEngine = HFEngine, mu0 = mu0,
approxParameters = approxParameters,
approx_state = approx_state, verbosity = verbosity,
timestamp = timestamp)
self._postInit()
@property
def greedyTol(self):
"""Value of greedyTol."""
return self._greedyTol
@greedyTol.setter
def greedyTol(self, greedyTol):
if greedyTol < 0:
raise RROMPyException("greedyTol must be non-negative.")
if hasattr(self, "_greedyTol") and self.greedyTol is not None:
greedyTolold = self.greedyTol
else:
greedyTolold = -1
self._greedyTol = greedyTol
self._approxParameters["greedyTol"] = self.greedyTol
if greedyTolold != self.greedyTol:
self.resetSamples()
@property
def collinearityTol(self):
"""Value of collinearityTol."""
return self._collinearityTol
@collinearityTol.setter
def collinearityTol(self, collinearityTol):
if collinearityTol < 0:
raise RROMPyException("collinearityTol must be non-negative.")
if (hasattr(self, "_collinearityTol")
and self.collinearityTol is not None):
collinearityTolold = self.collinearityTol
else:
collinearityTolold = -1
self._collinearityTol = collinearityTol
self._approxParameters["collinearityTol"] = self.collinearityTol
if collinearityTolold != self.collinearityTol:
self.resetSamples()
- @property
- def interactive(self):
- """Value of interactive."""
- return self._interactive
- @interactive.setter
- def interactive(self, interactive):
- self._interactive = interactive
-
@property
def maxIter(self):
"""Value of maxIter."""
return self._maxIter
@maxIter.setter
def maxIter(self, maxIter):
if maxIter <= 0: raise RROMPyException("maxIter must be positive.")
if hasattr(self, "_maxIter") and self.maxIter is not None:
maxIterold = self.maxIter
else:
maxIterold = -1
self._maxIter = maxIter
self._approxParameters["maxIter"] = self.maxIter
if maxIterold != self.maxIter:
self.resetSamples()
@property
def refinementRatio(self):
"""Value of refinementRatio."""
return self._refinementRatio
@refinementRatio.setter
def refinementRatio(self, refinementRatio):
if refinementRatio <= 0. or refinementRatio > 1.:
raise RROMPyException(("refinementRatio must be between 0 "
"(excluded) and 1."))
if (hasattr(self, "_refinementRatio")
and self.refinementRatio is not None):
refinementRatioold = self.refinementRatio
else:
refinementRatioold = -1
self._refinementRatio = refinementRatio
self._approxParameters["refinementRatio"] = self.refinementRatio
if refinementRatioold != self.refinementRatio:
self.resetSamples()
@property
def nTestPoints(self):
"""Value of nTestPoints."""
return self._nTestPoints
@nTestPoints.setter
def nTestPoints(self, nTestPoints):
if nTestPoints <= 0:
raise RROMPyException("nTestPoints must be positive.")
if not np.isclose(nTestPoints, np.int(nTestPoints)):
raise RROMPyException("nTestPoints must be an integer.")
nTestPoints = np.int(nTestPoints)
if hasattr(self, "_nTestPoints") and self.nTestPoints is not None:
nTestPointsold = self.nTestPoints
else:
nTestPointsold = -1
self._nTestPoints = nTestPoints
self._approxParameters["nTestPoints"] = self.nTestPoints
if nTestPointsold != self.nTestPoints:
self.resetSamples()
@property
def trainSetGenerator(self):
"""Value of trainSetGenerator."""
return self._trainSetGenerator
@trainSetGenerator.setter
def trainSetGenerator(self, trainSetGenerator):
if (isinstance(trainSetGenerator, (str,))
and trainSetGenerator.upper() == "AUTO"):
trainSetGenerator = self.sampler
if 'generatePoints' not in dir(trainSetGenerator):
raise RROMPyException("trainSetGenerator type not recognized.")
if (hasattr(self, '_trainSetGenerator')
and self.trainSetGenerator not in [None, "AUTO"]):
trainSetGeneratorOld = self.trainSetGenerator
self._trainSetGenerator = trainSetGenerator
self._approxParameters["trainSetGenerator"] = self.trainSetGenerator
if (not 'trainSetGeneratorOld' in locals()
or trainSetGeneratorOld != self.trainSetGenerator):
self.resetSamples()
def resetSamples(self):
"""Reset samples."""
super().resetSamples()
self._mus = emptyParameterList()
def initEstimatorNormEngine(self, normEngn : normEng = None):
"""Initialize estimator norm engine."""
if (normEngn is not None or not hasattr(self, "estimatorNormEngine")
or self.estimatorNormEngine is None):
if normEngn is None:
if not hasattr(self.HFEngine, "energyNormPartialDualMatrix"):
self.HFEngine.buildEnergyNormPartialDualForm()
estimatorEnergyMatrix = (
self.HFEngine.energyNormPartialDualMatrix)
else:
if hasattr(normEngn, "buildEnergyNormPartialDualForm"):
if not hasattr(normEngn, "energyNormPartialDualMatrix"):
normEngn.buildEnergyNormPartialDualForm()
estimatorEnergyMatrix = (
normEngn.energyNormPartialDualMatrix)
else:
estimatorEnergyMatrix = normEngn
self.estimatorNormEngine = normEngine(estimatorEnergyMatrix)
def _affineResidualMatricesContraction(self, rb:Np2D, rA : Np2D = None) \
-> Tuple[Np1D, Np1D, Np1D]:
self.assembleReducedResidualBlocks(full = rA is not None)
# 'ij,jk,ik->k', resbb, radiusb, radiusb.conj()
ff = np.sum(self.trainedModel.data.resbb.dot(rb) * rb.conj(), axis = 0)
if rA is None: return ff
# 'ijk,jkl,il->l', resAb, radiusA, radiusb.conj()
Lf = np.sum(np.tensordot(self.trainedModel.data.resAb, rA, 2)
* rb.conj(), axis = 0)
# 'ijkl,klt,ijt->t', resAA, radiusA, radiusA.conj()
LL = np.sum(np.tensordot(self.trainedModel.data.resAA, rA, 2)
* rA.conj(), axis = (0, 1))
return ff, Lf, LL
def errorEstimator(self, mus:Np1D) -> Np1D:
"""Standard residual-based error estimator."""
self.setupApprox()
checkIfAffine(self.HFEngine, "apply affinity-based error estimator")
self.HFEngine.buildA()
self.HFEngine.buildb()
mus = checkParameterList(mus, self.npar)[0]
vbMng(self.trainedModel, "INIT",
"Evaluating error estimator at mu = {}.".format(mus), 10)
verb = self.trainedModel.verbosity
self.trainedModel.verbosity = 0
uApproxRs = self.getApproxReduced(mus)
muTestEff = mus ** self.HFEngine.rescalingExp
radiusA = np.empty((len(self.HFEngine.thAs), len(mus)),
dtype = np.complex)
radiusb = np.empty((len(self.HFEngine.thbs), len(mus)),
dtype = np.complex)
for j, thA in enumerate(self.HFEngine.thAs):
radiusA[j] = expressionEvaluator(thA[0], muTestEff)
for j, thb in enumerate(self.HFEngine.thbs):
radiusb[j] = expressionEvaluator(thb[0], muTestEff)
radiusA = np.expand_dims(uApproxRs.data, 1) * radiusA
ff, Lf, LL = self._affineResidualMatricesContraction(radiusb, radiusA)
err = np.abs((LL - 2. * np.real(Lf) + ff) / ff) ** .5
self.trainedModel.verbosity = verb
vbMng(self.trainedModel, "DEL", "Done evaluating error estimator", 10)
return err
def getMaxErrorEstimator(self, mus:paramList) -> Tuple[Np1D, int, float]:
"""
Compute maximum of (and index of maximum of) error estimator over given
parameters.
"""
errorEstTest = self.errorEstimator(mus)
idxMaxEst = [np.argmax(errorEstTest)]
return errorEstTest, idxMaxEst, errorEstTest[idxMaxEst]
def _isLastSampleCollinear(self) -> bool:
"""Check collinearity of last sample."""
if self.collinearityTol <= 0.: return False
if self.POD:
reff = self.samplingEngine.RPOD[:, -1]
else:
RROMPyWarning(("Repeated orthogonalization of the samples for "
"collinearity check. Consider setting POD to "
"True."))
if not hasattr(self, "_PODEngine"):
from rrompy.sampling.base.pod_engine import PODEngine
self._PODEngine = PODEngine(self.HFEngine)
reff = self._PODEngine.generalizedQR(self.samplingEngine.samples,
only_R = True,
is_state = True)[:, -1]
cLevel = np.abs(reff[-1]) / np.linalg.norm(reff)
cLevel = np.inf if np.isclose(cLevel, 0.) else cLevel ** -1.
vbMng(self, "MAIN", "Collinearity indicator {:.4e}.".format(cLevel), 5)
return cLevel > self.collinearityTol
def greedyNextSample(self, muidx:int, plotEst : bool = False)\
-> Tuple[Np1D, int, float, paramVal]:
"""Compute next greedy snapshot of solution map."""
RROMPyAssert(self._mode, message = "Cannot add greedy sample.")
mus = copy(self.muTest[muidx])
self.muTest.pop(muidx)
for j, mu in enumerate(mus):
vbMng(self, "MAIN",
("Adding sample point no. {} at {} to training "
"set.").format(len(self.mus) + 1, mu), 2)
self.mus.append(mu)
self._S = len(self.mus)
self._approxParameters["S"] = self.S
if not np.allclose(mu, self.samplingEngine.mus.data[j - len(mus)]):
self.samplingEngine.nextSample(mu)
if self._isLastSampleCollinear():
vbMng(self, "MAIN",
("Collinearity above tolerance detected. Starting "
"preemptive greedy loop termination."), 2)
self._collinearityFlag = 1
errorEstTest = np.empty(len(self.muTest))
errorEstTest[:] = np.nan
return errorEstTest, [-1], np.nan, np.nan
errorEstTest, muidx, maxErrorEst = self.getMaxErrorEstimator(
self.muTest)
if (plotEst and not np.any(np.isnan(errorEstTest))
and not np.any(np.isinf(errorEstTest))):
musre = copy(self.muTest.re.data)
from matplotlib import pyplot as plt
plt.figure()
errCP = copy(errorEstTest)
while len(musre) > 0:
if self.npar == 1:
currIdx = np.arange(len(musre))
else:
currIdx = np.where(np.isclose(np.sum(
np.abs(musre[:, 1 :] - musre[0, 1 :]), 1), 0.))[0]
plt.semilogy(musre[currIdx, 0], errCP[currIdx], 'k',
linewidth = 1)
musre = np.delete(musre, currIdx, 0)
errCP = np.delete(errCP, currIdx)
plt.semilogy([self.muTest.re(0, 0), self.muTest.re(-1, 0)],
[self.greedyTol] * 2, 'r--')
plt.semilogy(self.mus.re(0),
2. * self.greedyTol * np.ones(len(self.mus)), '*m')
plt.semilogy(self.muTest.re(muidx, 0), maxErrorEst, 'xr')
plt.grid()
plt.show()
plt.close()
return errorEstTest, muidx, maxErrorEst, self.muTest[muidx]
def _preliminaryTraining(self):
"""Initialize starting snapshots of solution map."""
RROMPyAssert(self._mode, message = "Cannot start greedy algorithm.")
if self.samplingEngine.nsamples > 0:
return
self.computeScaleFactor()
self.resetSamples()
self.mus = self.trainSetGenerator.generatePoints(self.S)[
list(range(self.S))]
muTestBase = self.sampler.generatePoints(self.nTestPoints)
idxPop = pruneSamples(muTestBase ** self.HFEngine.rescalingExp,
self.mus ** self.HFEngine.rescalingExp,
1e-10 * self.scaleFactor[0])
muTestBase.pop(idxPop)
muTestBase = muTestBase.sort()
muLast = copy(self.mus[-1])
self.mus.pop()
if len(self.mus) > 0:
vbMng(self, "MAIN",
("Adding first {} sample point{} at {} to training "
"set.").format(self.S - 1, "" + "s" * (self.S > 2),
self.mus), 2)
self.samplingEngine.iterSample(self.mus)
self._S = len(self.mus)
self._approxParameters["S"] = self.S
self.muTest = emptyParameterList()
self.muTest.reset((len(muTestBase) + 1, self.mus.shape[1]))
self.muTest[: -1] = muTestBase.data
self.muTest[-1] = muLast.data
def _enrichTestSet(self, nTest:int):
"""Add extra elements to test set."""
RROMPyAssert(self._mode, message = "Cannot enrich test set.")
muTestExtra = self.sampler.generatePoints(2 * nTest)
muTotal = copy(self.mus)
muTotal.append(self.muTest)
idxPop = pruneSamples(muTestExtra ** self.HFEngine.rescalingExp,
muTotal ** self.HFEngine.rescalingExp,
1e-10 * self.scaleFactor[0])
muTestExtra.pop(idxPop)
muTestNew = np.empty((len(self.muTest) + len(muTestExtra),
self.muTest.shape[1]), dtype = np.complex)
muTestNew[: len(self.muTest)] = self.muTest.data
muTestNew[len(self.muTest) :] = muTestExtra.data
self.muTest = checkParameterList(muTestNew, self.npar)[0].sort()
vbMng(self, "MAIN",
"Enriching test set by {} elements.".format(len(muTestExtra)), 5)
def greedy(self, plotEst : bool = False):
"""Compute greedy snapshots of solution map."""
RROMPyAssert(self._mode, message = "Cannot start greedy algorithm.")
if self.samplingEngine.nsamples > 0:
return
vbMng(self, "INIT", "Starting computation of snapshots.", 2)
self._preliminaryTraining()
nTest = self.nTestPoints
muT0 = copy(self.muTest[-1])
errorEstTest, muidx, maxErrorEst, mu = self.greedyNextSample(
[len(self.muTest) - 1], plotEst)
self._collinearityFlag = 0
if maxErrorEst is not None and np.any(np.isnan(maxErrorEst)):
RROMPyWarning(("Instability in a posteriori estimator. "
"Starting preemptive greedy loop termination."))
self.muTest.append(muT0)
self.mus.pop(-1)
self.samplingEngine.popSample()
self.setupApprox()
else:
if maxErrorEst is not None:
max2ErrorEst = np.max(maxErrorEst)
vbMng(self, "MAIN", ("Uniform testing error estimate "
"{:.4e}.").format(max2ErrorEst), 2)
trainedModelOld = copy(self.trainedModel)
while (self.samplingEngine.nsamples < self.maxIter
and (maxErrorEst is None or max2ErrorEst > self.greedyTol)):
if (1. - self.refinementRatio) * nTest > len(self.muTest):
self._enrichTestSet(nTest)
nTest = len(self.muTest)
muTestOld = self.muTest
errorEstTest, muidx, maxErrorEst, mu = self.greedyNextSample(
muidx, plotEst)
if maxErrorEst is not None and (np.any(np.isnan(maxErrorEst))
or np.any(np.isinf(maxErrorEst))):
if self._collinearityFlag == 0:
RROMPyWarning(("Instability in a posteriori "
"estimator. Starting preemptive greedy "
"loop termination."))
self.muTest = muTestOld
self._approxParameters = (
trainedModelOld.data.approxParameters)
self._S = trainedModelOld.data.approxParameters["S"]
self._approxParameters["S"] = self.S
self.trainedModel.data = copy(trainedModelOld.data)
break
if maxErrorEst is not None:
max2ErrorEst = np.max(maxErrorEst)
vbMng(self, "MAIN", ("Uniform testing error estimate "
"{:.4e}.").format(max2ErrorEst), 2)
trainedModelOld.data = copy(self.trainedModel.data)
- if (self.interactive and maxErrorEst is not None
- and max2ErrorEst <= self.greedyTol):
- vbMng(self, "MAIN",
- ("Required tolerance {} achieved. Want to decrease "
- "greedyTol and continue? "
- "Y/N").format(self.greedyTol), 0, end = "")
- increasemaxIter = input()
- if increasemaxIter.upper() == "Y":
- vbMng(self, "MAIN", "Reducing value of greedyTol...",
- 0)
- while max2ErrorEst <= self._greedyTol:
- self._greedyTol *= .5
- if (self.interactive
- and self.samplingEngine.nsamples >= self.maxIter):
- vbMng(self, "MAIN",
- ("Maximum number of iterations {} reached. Want to "
- "increase maxIter and continue? "
- "Y/N").format(self.maxIter), 0, end = "")
- increasemaxIter = input()
- if increasemaxIter.upper() == "Y":
- vbMng(self, "MAIN", "Doubling value of maxIter...", 0)
- self._maxIter *= 2
while len(self.mus) > self.S: self.mus.pop(-1)
while self.samplingEngine.nsamples > self.S:
self.samplingEngine.popSample()
vbMng(self, "DEL",
("Done computing snapshots (final snapshot count: "
"{}).").format(self.samplingEngine.nsamples), 2)
def checkComputedApprox(self) -> bool:
"""
Check if setup of new approximant is not needed.
Returns:
True if new setup is not needed. False otherwise.
"""
return (super().checkComputedApprox()
and len(self.mus) == self.trainedModel.data.projMat.shape[1])
def assembleReducedResidualGramian(self, pMat:sampList):
"""
Build residual gramian of reduced linear system through projections.
"""
self.initEstimatorNormEngine()
if (not hasattr(self.trainedModel.data, "gramian")
or self.trainedModel.data.gramian is None):
gramian = self.estimatorNormEngine.innerProduct(pMat, pMat)
else:
Sold = self.trainedModel.data.gramian.shape[0]
S = len(self.mus)
if Sold > S:
gramian = self.trainedModel.data.gramian[: S, : S]
else:
idxOld = list(range(Sold))
idxNew = list(range(Sold, S))
gramian = np.empty((S, S), dtype = np.complex)
gramian[: Sold, : Sold] = self.trainedModel.data.gramian
gramian[: Sold, Sold :] = (
self.estimatorNormEngine.innerProduct(pMat(idxNew),
pMat(idxOld)))
gramian[Sold :, : Sold] = gramian[: Sold, Sold :].T.conj()
gramian[Sold :, Sold :] = (
self.estimatorNormEngine.innerProduct(pMat(idxNew),
pMat(idxNew)))
self.trainedModel.data.gramian = gramian
def assembleReducedResidualBlocksbb(self, bs:List[Np1D]):
"""
Build blocks (of type bb) of reduced linear system through projections.
"""
self.initEstimatorNormEngine()
nbs = len(bs)
if (not hasattr(self.trainedModel.data, "resbb")
or self.trainedModel.data.resbb is None):
resbb = np.empty((nbs, nbs), dtype = np.complex)
for i in range(nbs):
Mbi = bs[i]
resbb[i, i] = self.estimatorNormEngine.innerProduct(Mbi, Mbi)
for j in range(i):
Mbj = bs[j]
resbb[i, j] = self.estimatorNormEngine.innerProduct(Mbj,
Mbi)
for i in range(nbs):
for j in range(i + 1, nbs):
resbb[i, j] = resbb[j, i].conj()
self.trainedModel.data.resbb = resbb
def assembleReducedResidualBlocksAb(self, As:List[Np2D], bs:List[Np1D],
pMat:sampList):
"""
Build blocks (of type Ab) of reduced linear system through projections.
"""
self.initEstimatorNormEngine()
nAs = len(As)
nbs = len(bs)
S = len(self.mus)
if (not hasattr(self.trainedModel.data, "resAb")
or self.trainedModel.data.resAb is None):
if not isinstance(pMat, (np.ndarray,)): pMat = pMat.data
resAb = np.empty((nbs, S, nAs), dtype = np.complex)
for j in range(nAs):
MAj = dot(As[j], pMat)
for i in range(nbs):
Mbi = bs[i]
resAb[i, :, j] = self.estimatorNormEngine.innerProduct(MAj,
Mbi)
else:
Sold = self.trainedModel.data.resAb.shape[1]
if Sold == S: return
if Sold > S:
resAb = self.trainedModel.data.resAb[:, : S, :]
else:
if not isinstance(pMat, (np.ndarray,)): pMat = pMat.data
resAb = np.empty((nbs, S, nAs), dtype = np.complex)
resAb[:, : Sold, :] = self.trainedModel.data.resAb
for j in range(nAs):
MAj = dot(As[j], pMat[:, Sold :])
for i in range(nbs):
Mbi = bs[i]
resAb[i, Sold :, j] = (
self.estimatorNormEngine.innerProduct(MAj, Mbi))
self.trainedModel.data.resAb = resAb
def assembleReducedResidualBlocksAA(self, As:List[Np2D], pMat:sampList):
"""
Build blocks (of type AA) of reduced linear system through projections.
"""
self.initEstimatorNormEngine()
nAs = len(As)
S = len(self.mus)
if (not hasattr(self.trainedModel.data, "resAA")
or self.trainedModel.data.resAA is None):
if not isinstance(pMat, (np.ndarray,)): pMat = pMat.data
resAA = np.empty((S, nAs, S, nAs), dtype = np.complex)
for i in range(nAs):
MAi = dot(As[i], pMat)
resAA[:, i, :, i] = (
self.estimatorNormEngine.innerProduct(MAi, MAi))
for j in range(i):
MAj = dot(As[j], pMat)
resAA[:, i, :, j] = (
self.estimatorNormEngine.innerProduct(MAj, MAi))
for i in range(nAs):
for j in range(i + 1, nAs):
resAA[:, i, :, j] = resAA[:, j, :, i].T.conj()
else:
Sold = self.trainedModel.data.resAA.shape[0]
if Sold == S: return
if Sold > S:
resAA = self.trainedModel.data.resAA[: S, :, : S, :]
else:
if not isinstance(pMat, (np.ndarray,)): pMat = pMat.data
resAA = np.empty((S, nAs, S, nAs), dtype = np.complex)
resAA[: Sold, :, : Sold, :] = self.trainedModel.data.resAA
for i in range(nAs):
MAi = dot(As[i], pMat)
resAA[: Sold, i, Sold :, i] = (
self.estimatorNormEngine.innerProduct(MAi[:, Sold :],
MAi[:, : Sold]))
resAA[Sold :, i, : Sold, i] = resAA[: Sold, i,
Sold :, i].T.conj()
resAA[Sold :, i, Sold :, i] = (
self.estimatorNormEngine.innerProduct(MAi[:, Sold :],
MAi[:, Sold :]))
for j in range(i):
MAj = dot(As[j], pMat)
resAA[: Sold, i, Sold :, j] = (
self.estimatorNormEngine.innerProduct(MAj[:, Sold :],
MAi[:, : Sold]))
resAA[Sold :, i, : Sold, j] = (
self.estimatorNormEngine.innerProduct(MAj[:, : Sold],
MAi[:, Sold :]))
resAA[Sold :, i, Sold :, j] = (
self.estimatorNormEngine.innerProduct(MAj[:, Sold :],
MAi[:, Sold :]))
for i in range(nAs):
for j in range(i + 1, nAs):
resAA[: Sold, i, Sold :, j] = (
resAA[Sold :, j, : Sold, i].T.conj())
resAA[Sold :, i, : Sold, j] = (
resAA[: Sold, j, Sold :, i].T.conj())
resAA[Sold :, i, Sold :, j] = (
resAA[Sold :, j, Sold :, i].T.conj())
self.trainedModel.data.resAA = resAA
def assembleReducedResidualBlocks(self, full : bool = False):
"""Build affine blocks of affine decomposition of residual."""
if full:
checkIfAffine(self.HFEngine, "assemble reduced residual blocks")
else:
checkIfAffine(self.HFEngine, "assemble reduced RHS blocks", True)
self.HFEngine.buildb()
self.assembleReducedResidualBlocksbb(self.HFEngine.bs)
if full:
pMat = self.samplingEngine.samples
self.HFEngine.buildA()
self.assembleReducedResidualBlocksAb(self.HFEngine.As,
self.HFEngine.bs, pMat)
self.assembleReducedResidualBlocksAA(self.HFEngine.As, pMat)
diff --git a/rrompy/reduction_methods/greedy/rational_interpolant_greedy.py b/rrompy/reduction_methods/greedy/rational_interpolant_greedy.py
index 1f1c066..2ed5ac8 100644
--- a/rrompy/reduction_methods/greedy/rational_interpolant_greedy.py
+++ b/rrompy/reduction_methods/greedy/rational_interpolant_greedy.py
@@ -1,468 +1,463 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
from copy import deepcopy as copy
import numpy as np
from rrompy.hfengines.base.linear_affine_engine import checkIfAffine
from .generic_greedy_approximant import GenericGreedyApproximant
from rrompy.utilities.poly_fitting.polynomial import (polybases,
PolynomialInterpolator as PI,
polyvanderTotal as pvT)
from rrompy.utilities.numerical import totalDegreeN, dot
from rrompy.utilities.expression import expressionEvaluator
from rrompy.reduction_methods.standard import RationalInterpolant
from rrompy.utilities.base.types import (Np1D, Tuple, DictAny, HFEng, paramVal,
paramList)
from rrompy.utilities.base import verbosityManager as vbMng
from rrompy.utilities.poly_fitting import customFit
from rrompy.utilities.exception_manager import (RROMPyWarning, RROMPyException,
RROMPyAssert, RROMPy_FRAGILE)
from rrompy.parameter import checkParameterList
from rrompy.sampling import emptySampleList
__all__ = ['RationalInterpolantGreedy']
class RationalInterpolantGreedy(GenericGreedyApproximant, RationalInterpolant):
"""
ROM greedy rational interpolant computation for parametric problems.
Args:
HFEngine: HF problem solver.
mu0(optional): Default parameter. Defaults to 0.
approxParameters(optional): Dictionary containing values for main
parameters of approximant. Recognized keys are:
- 'POD': whether to compute POD of snapshots; defaults to True;
- 'S': number of starting training points;
- 'sampler': sample point generator;
- 'greedyTol': uniform error tolerance for greedy algorithm;
defaults to 1e-2;
- 'collinearityTol': collinearity tolerance for greedy algorithm;
defaults to 0.;
- - 'interactive': whether to interactively terminate greedy
- algorithm; defaults to False;
- 'maxIter': maximum number of greedy steps; defaults to 1e2;
- 'refinementRatio': ratio of test points to be exhausted before
test set refinement; defaults to 0.2;
- 'nTestPoints': number of test points; defaults to 5e2;
- 'trainSetGenerator': training sample points generator; defaults
to sampler;
- 'polybasis': type of basis for interpolation; defaults to
'MONOMIAL';
- 'errorEstimatorKind': kind of error estimator; available values
include 'AFFINE', 'DISCREPANCY', 'INTERPOLATORY',
'EIM_INTERPOLATORY', and 'EIM_DIAGONAL'; defaults to
'DISCREPANCY';
- 'interpRcond': tolerance for interpolation; defaults to None;
- 'robustTol': tolerance for robust rational denominator
management; defaults to 0.
Defaults to empty dict.
approx_state(optional): Whether to approximate state. Defaults and must
be True.
verbosity(optional): Verbosity level. Defaults to 10.
Attributes:
HFEngine: HF problem solver.
mu0: Default parameter.
mus: Array of snapshot parameters.
approxParameters: Dictionary containing values for main parameters of
approximant. Recognized keys are in parameterList.
parameterListSoft: Recognized keys of soft approximant parameters:
- 'POD': whether to compute POD of snapshots.
- 'greedyTol': uniform error tolerance for greedy algorithm;
- 'collinearityTol': collinearity tolerance for greedy algorithm;
- - 'interactive': whether to interactively terminate greedy
- algorithm;
- 'maxIter': maximum number of greedy steps;
- 'refinementRatio': ratio of test points to be exhausted before
test set refinement;
- 'nTestPoints': number of test points;
- 'trainSetGenerator': training sample points generator;
- 'errorEstimatorKind': kind of error estimator;
- 'interpRcond': tolerance for interpolation;
- 'robustTol': tolerance for robust rational denominator
management.
parameterListCritical: Recognized keys of critical approximant
parameters:
- 'S': total number of samples current approximant relies upon;
- 'sampler': sample point generator.
approx_state: Whether to approximate state.
verbosity: Verbosity level.
POD: whether to compute POD of snapshots.
S: number of test points.
sampler: Sample point generator.
greedyTol: uniform error tolerance for greedy algorithm.
collinearityTol: Collinearity tolerance for greedy algorithm.
- interactive: whether to interactively terminate greedy algorithm.
maxIter: maximum number of greedy steps.
refinementRatio: ratio of training points to be exhausted before
training set refinement.
nTestPoints: number of starting training points.
trainSetGenerator: training sample points generator.
robustTol: tolerance for robust rational denominator management.
errorEstimatorKind: kind of error estimator.
interpRcond: tolerance for interpolation.
robustTol: tolerance for robust rational denominator management.
muBounds: list of bounds for parameter values.
samplingEngine: Sampling engine.
estimatorNormEngine: Engine for estimator norm computation.
uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as
sampleList.
lastSolvedHF: Parameter(s) corresponding to last computed high fidelity
solution(s) as parameterList.
uApproxReduced: Reduced approximate solution(s) with parameter(s)
lastSolvedApprox as sampleList.
lastSolvedApproxReduced: Parameter(s) corresponding to last computed
reduced approximate solution(s) as parameterList.
uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as
sampleList.
lastSolvedApprox: Parameter(s) corresponding to last computed
approximate solution(s) as parameterList.
"""
_allowedEstimatorKinds = ["AFFINE", "DISCREPANCY", "INTERPOLATORY",
"LOOK_AHEAD", "NONE"]
def __init__(self, HFEngine:HFEng, mu0 : paramVal = None,
approxParameters : DictAny = {}, approx_state : bool = True,
verbosity : int = 10, timestamp : bool = True):
if not approx_state: RROMPyWarning("Overriding approx_state to True.")
self._preInit()
self._addParametersToList(["errorEstimatorKind"], ["DISCREPANCY"],
toBeExcluded = ["M", "N", "polydegreetype",
"radialDirectionalWeights",
"nNearestNeighbor"])
super().__init__(HFEngine = HFEngine, mu0 = mu0,
approxParameters = approxParameters,
approx_state = True, verbosity = verbosity,
timestamp = timestamp)
self._postInit()
@property
def E(self):
"""Value of E."""
self._E = self.sampleBatchIdx - 1
return self._E
@E.setter
def E(self, E):
RROMPyWarning(("E is used just to simplify inheritance, and its value "
"cannot be changed from that of sampleBatchIdx - 1."))
@property
def polydegreetype(self):
"""Value of polydegreetype."""
return "TOTAL"
@polydegreetype.setter
def polydegreetype(self, polydegreetype):
RROMPyWarning(("polydegreetype is used just to simplify inheritance, "
"and its value cannot be changed from 'TOTAL'."))
@property
def polybasis(self):
"""Value of polybasis."""
return self._polybasis
@polybasis.setter
def polybasis(self, polybasis):
try:
polybasis = polybasis.upper().strip().replace(" ","")
if polybasis not in polybases:
raise RROMPyException("Sample type not recognized.")
self._polybasis = polybasis
except:
RROMPyWarning(("Prescribed polybasis not recognized. Overriding "
"to 'MONOMIAL'."))
self._polybasis = "MONOMIAL"
self._approxParameters["polybasis"] = self.polybasis
@property
def errorEstimatorKind(self):
"""Value of errorEstimatorKind."""
return self._errorEstimatorKind
@errorEstimatorKind.setter
def errorEstimatorKind(self, errorEstimatorKind):
errorEstimatorKind = errorEstimatorKind.upper()
if errorEstimatorKind not in self._allowedEstimatorKinds:
RROMPyWarning(("Error estimator kind not recognized. Overriding "
"to 'DISCREPANCY'."))
errorEstimatorKind = "DISCREPANCY"
self._errorEstimatorKind = errorEstimatorKind
self._approxParameters["errorEstimatorKind"] = self.errorEstimatorKind
def errorEstimator(self, mus:Np1D) -> Np1D:
"""Standard residual-based error estimator."""
if self.errorEstimatorKind == "AFFINE":
return super().errorEstimator(mus)
setupOK = self.setupApprox()
if not setupOK:
err = np.empty(len(mus))
err[:] = np.nan
return err
self._setupInterpolationIndices()
mus = checkParameterList(mus, self.npar)[0]
muCTest = self.trainedModel.centerNormalize(mus)
vbMng(self.trainedModel, "INIT",
"Evaluating error estimator at mu = {}.".format(mus), 10)
verb = self.trainedModel.verbosity
self.trainedModel.verbosity = 0
QTest = self.trainedModel.getQVal(mus)
if self.errorEstimatorKind == "DISCREPANCY":
checkIfAffine(self.HFEngine,
"apply discrepancy-based error estimator")
self.HFEngine.buildA()
self.HFEngine.buildb()
nAs, nbs = self.HFEngine.nAs, self.HFEngine.nbs
muTrainEff = self.mus ** self.HFEngine.rescalingExp
muTestEff = mus ** self.HFEngine.rescalingExp
PTrain = self.trainedModel.getPVal(self.mus).data.T
QTrain = self.trainedModel.getQVal(self.mus)
PTest = self.trainedModel.getPVal(mus).data
radiusAbTrain = np.empty((self.S, nAs * self.S + nbs),
dtype = np.complex)
radiusA = np.empty((self.S, nAs, len(mus)), dtype = np.complex)
radiusb = np.empty((nbs, len(mus)), dtype = np.complex)
for j, thA in enumerate(self.HFEngine.thAs):
idxs = j * self.S + np.arange(self.S)
radiusAbTrain[:, idxs] = expressionEvaluator(thA[0],
muTrainEff, (self.S, 1)) * PTrain
radiusA[:, j] = PTest * expressionEvaluator(thA[0],
muTestEff, (len(mus),))
for j, thb in enumerate(self.HFEngine.thbs):
idx = nAs * self.S + j
radiusAbTrain[:, idx] = QTrain * expressionEvaluator(thb[0],
muTrainEff, (self.S,))
radiusb[j] = QTest * expressionEvaluator(thb[0], muTestEff,
(len(mus),))
QRHSNorm2 = self._affineResidualMatricesContraction(radiusb)
vanTrain = pvT(self._musUniqueCN, self.E, self.polybasis0,
self._derIdxs, self._reorder)
interpPQ = customFit(vanTrain, radiusAbTrain,
rcond = self.interpRcond)
vanTest = pvT(muCTest, self.E, self.polybasis0)
DradiusAb = vanTest.dot(interpPQ)
radiusA = (radiusA
- DradiusAb[:, : - nbs].reshape(len(mus), -1, self.S).T)
radiusb = radiusb - DradiusAb[:, - nbs :].T
ff, Lf, LL = self._affineResidualMatricesContraction(radiusb,
radiusA)
err = np.abs((LL - 2. * np.real(Lf) + ff) / QRHSNorm2) ** .5
else: #if self.errorEstimatorKind in ["INTERPOLATORY", "LOOK_AHEAD", "NONE"]:
QTest = np.abs(QTest)
muCTest = self.trainedModel.centerNormalize(mus)
muCTrain = self.trainedModel.centerNormalize(self.mus)
vanTest = pvT(muCTest, self.E, self.polybasis)
vanTestNext = pvT(muCTest, self.E + 1, self.polybasis)[:,
vanTest.shape[1] :]
idxsTest = np.arange(vanTestNext.shape[1])
basis = np.zeros((len(idxsTest), 0), dtype = float)
idxMaxEst = []
while len(idxsTest) > 0:
vanTrial = pvT(muCTrain, self.E, self.polybasis)
vanTrialNext = pvT(muCTrain, self.E + 1, self.polybasis)[:,
vanTrial.shape[1] :]
vanTrial = np.hstack((vanTrial,
vanTrialNext.dot(basis).reshape(
len(vanTrialNext), basis.shape[1])))
valuesTrial = vanTrialNext[:, idxsTest]
vanTestEff = np.hstack((vanTest,
vanTestNext.dot(basis).reshape(
len(vanTestNext), basis.shape[1])))
vanTestNextEff = vanTestNext[:, idxsTest]
coeffTest = np.linalg.solve(vanTrial, valuesTrial)
errTest = (np.abs(vanTestNextEff - vanTestEff.dot(coeffTest))
/ np.expand_dims(QTest, 1))
if self.errorEstimatorKind == "NONE": break
idxMaxErr = np.unravel_index(np.argmax(errTest), errTest.shape)
idxMaxEst += [idxMaxErr[0]]
muCTrain.append(muCTest[idxMaxErr[0]])
basis = np.pad(basis, [(0, 0), (0, 1)], "constant")
basis[idxsTest[idxMaxErr[1]], -1] = 1.
idxsTest = np.delete(idxsTest, idxMaxErr[1])
if self.errorEstimatorKind in ["INTERPOLATORY", "LOOK_AHEAD"]:
self.initEstimatorNormEngine()
mu_muTestSample = mus[idxMaxEst]
app_muTestSample = self.getApproxReduced(mu_muTestSample)
if self._mode == RROMPy_FRAGILE:
if (self.errorEstimatorKind == "INTERPOLATORY"
and not self.HFEngine.isCEye):
raise RROMPyException(("Cannot compute INTERPOLATORY "
"residual estimator in fragile "
"mode for non-scalar C."))
app_muTestSample = dot(self.trainedModel.data.projMat,
app_muTestSample.data)
else:
app_muTestSample = dot(self.samplingEngine.samples,
app_muTestSample)
if self.errorEstimatorKind == "INTERPOLATORY":
resmu = self.HFEngine.residual(mu_muTestSample,
app_muTestSample,
post_c = False)
RHSmu = self.HFEngine.residual(mu_muTestSample, None,
post_c = False)
else: #if self.errorEstimatorKind == "LOOK_AHEAD":
for j, mu in enumerate(mu_muTestSample):
self.samplingEngine.nextSample(mu)
if hasattr(self.samplingEngine, "samples_full"):
uEx = self.samplingEngine.samples_full[-1]
else:
uEx = self.samplingEngine.samples[-1]
if j == 0:
RHSmu = emptySampleList()
RHSmu.reset((len(uEx), len(mu_muTestSample)),
dtype = uEx.dtype)
RHSmu[j] = uEx
resmu = RHSmu - app_muTestSample
ressamples = (self.estimatorNormEngine.norm(resmu)
/ self.estimatorNormEngine.norm(RHSmu))
# improve the following by explicitly constructing (tensorised)
# interpolant as in while loop
musT = copy(self.mus)
musT.append(mu_muTestSample)
musT = self.trainedModel.centerNormalize(musT)
musC = self.trainedModel.centerNormalize(mus)
resT = np.zeros(len(musT), dtype = np.complex)
err = np.zeros(len(mus))
for l in range(len(mu_muTestSample)):
resT[len(self.mus) + l] = (ressamples[l]
* QTest[idxMaxEst[l]])
p = PI()
wellCond, msg = p.setupByInterpolation(musT, resT,
self.E + 1, self.polybasis,
self.verbosity >= 15, True, {},
{"rcond": self.interpRcond})
err += np.abs(p(musC))
resT[len(self.mus) + l] = 0.
err /= QTest
vbMng(self, "MAIN", msg, 15)
else: #if self.errorEstimatorKind == "NONE":
err = np.max(errTest, axis = 1)
err *= self.greedyTol / np.mean(err)
self.trainedModel.verbosity = verb
vbMng(self.trainedModel, "DEL", "Done evaluating error estimator", 10)
return err
def getMaxErrorEstimator(self, mus:paramList) -> Tuple[Np1D, int, float]:
"""
Compute maximum of (and index of maximum of) error estimator over given
parameters.
"""
errorEstTest = self.errorEstimator(mus)
idxMaxEst = np.empty(self.sampleBatchSize, dtype = int)
errCP = copy(errorEstTest)
for j in range(self.sampleBatchSize):
k = np.argmax(errCP)
idxMaxEst[j] = k
if j + 1 < self.sampleBatchSize:
musZero = self.trainedModel.centerNormalize(mus, mus[k])
errCP *= np.linalg.norm(musZero.data, axis = 1)
maxEst = errorEstTest[idxMaxEst]
return errorEstTest, idxMaxEst, maxEst
def greedyNextSample(self, muidx:int, plotEst : bool = False)\
-> Tuple[Np1D, int, float, paramVal]:
"""Compute next greedy snapshot of solution map."""
RROMPyAssert(self._mode, message = "Cannot add greedy sample.")
self.sampleBatchIdx += 1
self.sampleBatchSize = totalDegreeN(self.npar - 1, self.sampleBatchIdx)
err, muidx, maxErr, muNext = super().greedyNextSample(muidx, plotEst)
if maxErr is not None and (np.any(np.isnan(maxErr))
or np.any(np.isinf(maxErr))):
self.sampleBatchIdx -= 1
self.sampleBatchSize = totalDegreeN(self.npar - 1,
self.sampleBatchIdx)
if (self.errorEstimatorKind == "NONE" and not np.isnan(maxErr)
and not np.isinf(maxErr)):
maxErr = None
return err, muidx, maxErr, muNext
def _preliminaryTraining(self):
"""Initialize starting snapshots of solution map."""
RROMPyAssert(self._mode, message = "Cannot start greedy algorithm.")
if self.samplingEngine.nsamples > 0:
return
S = self.S
self.sampleBatchIdx, self.sampleBatchSize, self._S = -1, 0, 0
nextBatchSize = 1
while self._S + nextBatchSize <= S:
self.sampleBatchIdx += 1
self.sampleBatchSize = nextBatchSize
self._S += self.sampleBatchSize
nextBatchSize = totalDegreeN(self.npar - 1,
self.sampleBatchIdx + 1)
super()._preliminaryTraining()
def setupApprox(self, plotEst : bool = False):
"""
Compute rational interpolant.
SVD-based robust eigenvalue management.
"""
if self.checkComputedApprox():
return True
RROMPyAssert(self._mode, message = "Cannot setup approximant.")
vbMng(self, "INIT", "Setting up {}.".format(self.name()), 5)
self.greedy(plotEst)
self._N, self._M = [self.E] * 2
pMat = self.samplingEngine.samples.data
pMatEff = dot(self.HFEngine.C, pMat)
if self.trainedModel is None:
self.trainedModel = self.tModelType()
self.trainedModel.verbosity = self.verbosity
self.trainedModel.timestamp = self.timestamp
datadict = {"mu0": self.mu0, "projMat": pMatEff,
"scaleFactor": self.scaleFactor,
"rescalingExp": self.HFEngine.rescalingExp}
self.trainedModel.data = self.initializeModelData(datadict)[0]
else:
self.trainedModel = self.trainedModel
self.trainedModel.data.projMat = copy(pMatEff)
self.trainedModel.data.mus = copy(self.mus)
self.trainedModel.data.mus = copy(self.mus)
self.catchInstability = True
if self.N > 0:
try:
Q = self._setupDenominator()[0]
except RROMPyException as RE:
RROMPyWarning(RE._msg)
vbMng(self, "DEL", "Done setting up approximant.", 5)
return False
else:
Q = PI()
Q.coeffs = np.ones(1, dtype = np.complex)
Q.npar = 1
Q.polybasis = self.polybasis
self.trainedModel.data.Q = copy(Q)
try:
self.trainedModel.data.P = copy(self._setupNumerator())
except RROMPyException as RE:
RROMPyWarning(RE._msg)
vbMng(self, "DEL", "Done setting up approximant.", 5)
return False
self.trainedModel.data.approxParameters = copy(self.approxParameters)
vbMng(self, "DEL", "Done setting up approximant.", 5)
return True
def loadTrainedModel(self, filename:str):
"""Load trained reduced model from file."""
super().loadTrainedModel(filename)
self.sampleBatchIdx, self.sampleBatchSize, _S = -1, 0, 0
nextBatchSize = 1
while _S + nextBatchSize <= self.S + 1:
self.sampleBatchIdx += 1
self.sampleBatchSize = nextBatchSize
_S += self.sampleBatchSize
nextBatchSize = totalDegreeN(self.npar - 1,
self.sampleBatchIdx + 1)
diff --git a/rrompy/reduction_methods/greedy/reduced_basis_greedy.py b/rrompy/reduction_methods/greedy/reduced_basis_greedy.py
index 2826daf..6937d12 100644
--- a/rrompy/reduction_methods/greedy/reduced_basis_greedy.py
+++ b/rrompy/reduction_methods/greedy/reduced_basis_greedy.py
@@ -1,184 +1,179 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
from copy import deepcopy as copy
from .generic_greedy_approximant import GenericGreedyApproximant
from rrompy.reduction_methods.standard import ReducedBasis
from rrompy.utilities.base.types import DictAny, HFEng, paramVal
from rrompy.utilities.base import verbosityManager as vbMng
from rrompy.utilities.numerical import dot
from rrompy.utilities.exception_manager import RROMPyWarning, RROMPyAssert
__all__ = ['ReducedBasisGreedy']
class ReducedBasisGreedy(GenericGreedyApproximant, ReducedBasis):
"""
ROM greedy RB approximant computation for parametric problems.
Args:
HFEngine: HF problem solver.
mu0(optional): Default parameter. Defaults to 0.
approxParameters(optional): Dictionary containing values for main
parameters of approximant. Recognized keys are:
- 'POD': whether to compute POD of snapshots; defaults to True;
- 'S': number of starting training points;
- 'sampler': sample point generator;
- 'greedyTol': uniform error tolerance for greedy algorithm;
defaults to 1e-2;
- 'collinearityTol': collinearity tolerance for greedy algorithm;
defaults to 0.;
- - 'interactive': whether to interactively terminate greedy
- algorithm; defaults to False;
- 'maxIter': maximum number of greedy steps; defaults to 1e2;
- 'refinementRatio': ratio of test points to be exhausted before
test set refinement; defaults to 0.2;
- 'nTestPoints': number of test points; defaults to 5e2;
- 'trainSetGenerator': training sample points generator; defaults
to sampler.
Defaults to empty dict.
approx_state(optional): Whether to approximate state. Defaults and must
be True.
verbosity(optional): Verbosity level. Defaults to 10.
Attributes:
HFEngine: HF problem solver.
mu0: Default parameter.
mus: Array of snapshot parameters.
approxParameters: Dictionary containing values for main parameters of
approximant. Recognized keys are in parameterList.
parameterListSoft: Recognized keys of soft approximant parameters:
- 'POD': whether to compute POD of snapshots.
- 'greedyTol': uniform error tolerance for greedy algorithm;
- 'collinearityTol': collinearity tolerance for greedy algorithm;
- - 'interactive': whether to interactively terminate greedy
- algorithm;
- 'maxIter': maximum number of greedy steps;
- 'refinementRatio': ratio of test points to be exhausted before
test set refinement;
- 'nTestPoints': number of test points;
- 'trainSetGenerator': training sample points generator.
parameterListCritical: Recognized keys of critical approximant
parameters:
- 'S': total number of samples current approximant relies upon;
- 'sampler': sample point generator.
approx_state: Whether to approximate state.
verbosity: Verbosity level.
POD: whether to compute POD of snapshots.
S: number of test points.
sampler: Sample point generator.
greedyTol: uniform error tolerance for greedy algorithm.
collinearityTol: Collinearity tolerance for greedy algorithm.
- interactive: whether to interactively terminate greedy algorithm.
maxIter: maximum number of greedy steps.
refinementRatio: ratio of training points to be exhausted before
training set refinement.
nTestPoints: number of starting training points.
trainSetGenerator: training sample points generator.
muBounds: list of bounds for parameter values.
samplingEngine: Sampling engine.
estimatorNormEngine: Engine for estimator norm computation.
uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as
sampleList.
lastSolvedHF: Parameter(s) corresponding to last computed high fidelity
solution(s) as parameterList.
uApproxReduced: Reduced approximate solution(s) with parameter(s)
lastSolvedApprox as sampleList.
lastSolvedApproxReduced: Parameter(s) corresponding to last computed
reduced approximate solution(s) as parameterList.
uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as
sampleList.
lastSolvedApprox: Parameter(s) corresponding to last computed
approximate solution(s) as parameterList.
As: List of sparse matrices (in CSC format) representing coefficients
of linear system matrix.
bs: List of numpy vectors representing coefficients of linear system
RHS.
ARBs: List of sparse matrices (in CSC format) representing coefficients
of compressed linear system matrix.
bRBs: List of numpy vectors representing coefficients of compressed
linear system RHS.
"""
def __init__(self, HFEngine:HFEng, mu0 : paramVal = None,
approxParameters : DictAny = {}, approx_state : bool = True,
verbosity : int = 10, timestamp : bool = True):
if not approx_state: RROMPyWarning("Overriding approx_state to True.")
self._preInit()
self._addParametersToList(toBeExcluded = ["R", "PODTolerance"])
super().__init__(HFEngine = HFEngine, mu0 = mu0,
approxParameters = approxParameters,
approx_state = True, verbosity = verbosity,
timestamp = timestamp)
self._postInit()
@property
def R(self):
"""Value of R."""
self._R = self._S
return self._R
@R.setter
def R(self, R):
RROMPyWarning(("R is used just to simplify inheritance, and its value "
"cannot be changed from that of S."))
@property
def PODTolerance(self):
"""Value of PODTolerance."""
self._PODTolerance = -1
return self._PODTolerance
@PODTolerance.setter
def PODTolerance(self, PODTolerance):
RROMPyWarning(("PODTolerance is used just to simplify inheritance, "
"and its value cannot be changed from -1."))
def setupApprox(self, plotEst : bool = False):
"""Compute RB projection matrix."""
if self.checkComputedApprox():
return
RROMPyAssert(self._mode, message = "Cannot setup approximant.")
vbMng(self, "INIT", "Setting up {}.". format(self.name()), 5)
self.greedy(plotEst)
vbMng(self, "INIT", "Computing projection matrix.", 7)
pMat = self.samplingEngine.samples.data
pMatEff = dot(self.HFEngine.C, pMat)
if self.trainedModel is None:
self.trainedModel = self.tModelType()
self.trainedModel.verbosity = self.verbosity
self.trainedModel.timestamp = self.timestamp
datadict = {"mu0": self.mu0, "projMat": pMatEff,
"scaleFactor": self.scaleFactor,
"rescalingExp": self.HFEngine.rescalingExp}
data = self.initializeModelData(datadict)[0]
ARBs, bRBs = self.assembleReducedSystem(pMat)
data.affinePoly = self.HFEngine.affinePoly
self.HFEngine.buildA()
self.HFEngine.buildb()
data.thAs, data.thbs = self.HFEngine.thAs, self.HFEngine.thbs
self.trainedModel.data = data
else:
self.trainedModel = self.trainedModel
Sold = self.trainedModel.data.projMat.shape[1]
ARBs, bRBs = self.assembleReducedSystem(pMat[:, Sold :],
pMat[:, : Sold])
self.trainedModel.data.projMat = copy(pMatEff)
self.trainedModel.data.mus = copy(self.mus)
self.trainedModel.data.ARBs = ARBs
self.trainedModel.data.bRBs = bRBs
vbMng(self, "DEL", "Done computing projection matrix.", 7)
self.trainedModel.data.approxParameters = copy(self.approxParameters)
vbMng(self, "DEL", "Done setting up approximant.", 5)