Page MenuHomec4science

rational_interpolant_greedy.py
No OneTemporary

File Metadata

Created
Mon, May 6, 09:04

rational_interpolant_greedy.py

# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see <http://www.gnu.org/licenses/>.
#
from copy import deepcopy as copy
import numpy as np
from .generic_greedy_approximant import (GenericGreedyApproximant,
localL2Distance as lL2D)
from rrompy.utilities.poly_fitting.polynomial import (polybases, polydomcoeff,
PolynomialInterpolator as PI,
polyvanderTotal as pvT)
from rrompy.utilities.numerical import totalDegreeN, dot
from rrompy.utilities.expression import expressionEvaluator
from rrompy.reduction_methods.standard import RationalInterpolant
from rrompy.utilities.base.types import (Np1D, Tuple, DictAny, HFEng, paramVal,
paramList)
from rrompy.utilities.base import verbosityManager as vbMng
from rrompy.utilities.poly_fitting import customFit
from rrompy.utilities.exception_manager import (RROMPyWarning, RROMPyException,
RROMPyAssert, RROMPy_FRAGILE)
from rrompy.parameter import checkParameterList
__all__ = ['RationalInterpolantGreedy']
class RationalInterpolantGreedy(GenericGreedyApproximant, RationalInterpolant):
"""
ROM greedy rational interpolant computation for parametric problems.
Args:
HFEngine: HF problem solver.
mu0(optional): Default parameter. Defaults to 0.
approxParameters(optional): Dictionary containing values for main
parameters of approximant. Recognized keys are:
- 'POD': whether to compute POD of snapshots; defaults to True;
- 'S': number of starting training points;
- 'sampler': sample point generator;
- 'greedyTol': uniform error tolerance for greedy algorithm;
defaults to 1e-2;
- 'collinearityTol': collinearity tolerance for greedy algorithm;
defaults to 0.;
- 'interactive': whether to interactively terminate greedy
algorithm; defaults to False;
- 'maxIter': maximum number of greedy steps; defaults to 1e2;
- 'refinementRatio': ratio of test points to be exhausted before
test set refinement; defaults to 0.2;
- 'nTestPoints': number of test points; defaults to 5e2;
- 'trainSetGenerator': training sample points generator; defaults
to sampler;
- 'polybasis': type of basis for interpolation; defaults to
'MONOMIAL';
- 'errorEstimatorKind': kind of error estimator; available values
include 'AFFINE', 'DISCREPANCY', 'INTERPOLATORY',
'EIM_INTERPOLATORY', and 'EIM_DIAGONAL'; defaults to 'AFFINE';
- 'interpRcond': tolerance for interpolation; defaults to None;
- 'robustTol': tolerance for robust rational denominator
management; defaults to 0.
Defaults to empty dict.
approx_state(optional): Whether to approximate state. Defaults and must
be True.
verbosity(optional): Verbosity level. Defaults to 10.
Attributes:
HFEngine: HF problem solver.
mu0: Default parameter.
mus: Array of snapshot parameters.
approxParameters: Dictionary containing values for main parameters of
approximant. Recognized keys are in parameterList.
parameterListSoft: Recognized keys of soft approximant parameters:
- 'POD': whether to compute POD of snapshots.
- 'greedyTol': uniform error tolerance for greedy algorithm;
- 'collinearityTol': collinearity tolerance for greedy algorithm;
- 'interactive': whether to interactively terminate greedy
algorithm;
- 'maxIter': maximum number of greedy steps;
- 'refinementRatio': ratio of test points to be exhausted before
test set refinement;
- 'nTestPoints': number of test points;
- 'trainSetGenerator': training sample points generator;
- 'errorEstimatorKind': kind of error estimator;
- 'interpRcond': tolerance for interpolation;
- 'robustTol': tolerance for robust rational denominator
management.
parameterListCritical: Recognized keys of critical approximant
parameters:
- 'S': total number of samples current approximant relies upon;
- 'sampler': sample point generator.
approx_state: Whether to approximate state.
verbosity: Verbosity level.
POD: whether to compute POD of snapshots.
S: number of test points.
sampler: Sample point generator.
greedyTol: uniform error tolerance for greedy algorithm.
collinearityTol: Collinearity tolerance for greedy algorithm.
interactive: whether to interactively terminate greedy algorithm.
maxIter: maximum number of greedy steps.
refinementRatio: ratio of training points to be exhausted before
training set refinement.
nTestPoints: number of starting training points.
trainSetGenerator: training sample points generator.
robustTol: tolerance for robust rational denominator management.
errorEstimatorKind: kind of error estimator.
interpRcond: tolerance for interpolation.
robustTol: tolerance for robust rational denominator management.
muBounds: list of bounds for parameter values.
samplingEngine: Sampling engine.
estimatorNormEngine: Engine for estimator norm computation.
uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as
sampleList.
lastSolvedHF: Parameter(s) corresponding to last computed high fidelity
solution(s) as parameterList.
uApproxReduced: Reduced approximate solution(s) with parameter(s)
lastSolvedApprox as sampleList.
lastSolvedApproxReduced: Parameter(s) corresponding to last computed
reduced approximate solution(s) as parameterList.
uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as
sampleList.
lastSolvedApprox: Parameter(s) corresponding to last computed
approximate solution(s) as parameterList.
"""
_allowedEstimatorKinds = ["AFFINE", "DISCREPANCY", "INTERPOLATORY",
"EIM_INTERPOLATORY", "EIM_DIAGONAL"]
def __init__(self, HFEngine:HFEng, mu0 : paramVal = None,
approxParameters : DictAny = {}, approx_state : bool = True,
verbosity : int = 10, timestamp : bool = True):
if not approx_state: RROMPyWarning("Overriding approx_state to True.")
self._preInit()
self._addParametersToList(["errorEstimatorKind"], ["AFFINE"],
toBeExcluded = ["M", "N", "polydegreetype",
"radialDirectionalWeights",
"nNearestNeighbor"])
super().__init__(HFEngine = HFEngine, mu0 = mu0,
approxParameters = approxParameters,
approx_state = True, verbosity = verbosity,
timestamp = timestamp)
self._postInit()
@property
def E(self):
"""Value of E."""
self._E = self.sampleBatchIdx - 1
return self._E
@E.setter
def E(self, E):
RROMPyWarning(("E is used just to simplify inheritance, and its value "
"cannot be changed from that of sampleBatchIdx - 1."))
@property
def polydegreetype(self):
"""Value of polydegreetype."""
return "TOTAL"
@polydegreetype.setter
def polydegreetype(self, polydegreetype):
RROMPyWarning(("polydegreetype is used just to simplify inheritance, "
"and its value cannot be changed from 'TOTAL'."))
@property
def polybasis(self):
"""Value of polybasis."""
return self._polybasis
@polybasis.setter
def polybasis(self, polybasis):
try:
polybasis = polybasis.upper().strip().replace(" ","")
if polybasis not in polybases:
raise RROMPyException("Sample type not recognized.")
self._polybasis = polybasis
except:
RROMPyWarning(("Prescribed polybasis not recognized. Overriding "
"to 'MONOMIAL'."))
self._polybasis = "MONOMIAL"
self._approxParameters["polybasis"] = self.polybasis
@property
def errorEstimatorKind(self):
"""Value of errorEstimatorKind."""
return self._errorEstimatorKind
@errorEstimatorKind.setter
def errorEstimatorKind(self, errorEstimatorKind):
errorEstimatorKind = errorEstimatorKind.upper()
if errorEstimatorKind not in self._allowedEstimatorKinds:
RROMPyWarning(("Error estimator kind not recognized. Overriding "
"to 'AFFINE'."))
errorEstimatorKind = "AFFINE"
self._errorEstimatorKind = errorEstimatorKind
self._approxParameters["errorEstimatorKind"] = self.errorEstimatorKind
def errorEstimator(self, mus:Np1D) -> Np1D:
"""Standard residual-based error estimator."""
if self.errorEstimatorKind == "AFFINE":
return super().errorEstimator(mus)
setupOK = self.setupApprox()
if not setupOK:
err = np.empty(len(mus))
err[:] = np.nan
return err
if self.errorEstimatorKind == "DIAGONAL":
return self.errorEstimatorEIM(mus)
self._setupInterpolationIndices()
mus = checkParameterList(mus, self.npar)[0]
muCTest = self.trainedModel.centerNormalize(mus)
vbMng(self.trainedModel, "INIT",
"Evaluating error estimator at mu = {}.".format(mus), 10)
verb = self.trainedModel.verbosity
self.trainedModel.verbosity = 0
QTest = self.trainedModel.getQVal(mus)
if self.errorEstimatorKind == "DISCREPANCY":
nAs, nbs = len(self.HFEngine.thAs), len(self.HFEngine.thbs)
muTrainEff = self.mus ** self.HFEngine.rescalingExp
muTestEff = mus ** self.HFEngine.rescalingExp
PTrain = self.trainedModel.getPVal(self.mus).data.T
QTrain = self.trainedModel.getQVal(self.mus)
PTest = self.trainedModel.getPVal(mus).data
radiusAbTrain = np.empty((self.S, nAs * self.S + nbs),
dtype = np.complex)
radiusA = np.empty((self.S, nAs, len(mus)), dtype = np.complex)
radiusb = np.empty((nbs, len(mus)), dtype = np.complex)
for j, thA in enumerate(self.HFEngine.thAs):
idxs = j * self.S + np.arange(self.S)
radiusAbTrain[:, idxs] = expressionEvaluator(thA[0],
muTrainEff, (self.S, 1)) * PTrain
radiusA[:, j] = PTest * expressionEvaluator(thA[0],
muTestEff, (len(mus),))
for j, thb in enumerate(self.HFEngine.thbs):
idx = nAs * self.S + j
radiusAbTrain[:, idx] = QTrain * expressionEvaluator(thb[0],
muTrainEff, (self.S,))
radiusb[j] = QTest * expressionEvaluator(thb[0], muTestEff,
(len(mus),))
QRHSNorm2 = self._affineResidualMatricesContraction(radiusb)
vanTrain, _, vanTrainIdxs = pvT(self._musUniqueCN, self.E,
self.polybasis0, self._derIdxs,
self._reorder)
interpPQ = customFit(vanTrain[:, vanTrainIdxs], radiusAbTrain,
rcond = self.interpRcond)
vanTest, _, vanTestIdxs = pvT(muCTest, self.E, self.polybasis0)
DradiusAb = vanTest[:, vanTestIdxs].dot(interpPQ)
radiusA = (radiusA
- DradiusAb[:, : - nbs].reshape(len(mus), -1, self.S).T)
radiusb = radiusb - DradiusAb[:, - nbs :].T
ff, Lf, LL = self._affineResidualMatricesContraction(radiusb,
radiusA)
err = np.abs((LL - 2. * np.real(Lf) + ff) / QRHSNorm2) ** .5
else: #if self.errorEstimatorKind == "INTERPOLATORY":
muCTrain = self.trainedModel.centerNormalize(self.mus)
samplingRatio = np.prod(lL2D(muCTest.data, muCTrain.data),
axis = 1) / np.abs(QTest)
self.initEstimatorNormEngine()
QTest = np.abs(QTest)
sampRCP = copy(samplingRatio)
idx_muTestSample = np.empty(self.sampleBatchSize,
dtype = int)
for j in range(self.sampleBatchSize):
k = np.argmax(sampRCP)
idx_muTestSample[j] = k
if j + 1 < self.sampleBatchSize:
musZero = self.trainedModel.centerNormalize(mus, mus[k])
sampRCP *= np.linalg.norm(musZero.data, axis = 1)
mu_muTestSample = mus[idx_muTestSample]
app_muTestSample = self.getApproxReduced(mu_muTestSample)
if self._mode == RROMPy_FRAGILE:
if not self.HFEngine.isCEye:
raise RROMPyException(("Cannot compute INTERPOLATORY "
"residual estimator in fragile "
"mode for non-scalar C."))
app_muTestSample = dot(self.trainedModel.data.projMat,
app_muTestSample.data)
else:
app_muTestSample = dot(self.samplingEngine.samples,
app_muTestSample)
resmu = self.HFEngine.residual(mu_muTestSample, app_muTestSample,
post_c = False)
RHSmu = self.HFEngine.residual(mu_muTestSample, None,
post_c = False)
ressamples = (self.estimatorNormEngine.norm(resmu)
/ self.estimatorNormEngine.norm(RHSmu))
musT = copy(self.mus)
musT.append(mu_muTestSample)
musT = self.trainedModel.centerNormalize(musT)
musC = self.trainedModel.centerNormalize(mus)
resT = np.zeros(len(musT), dtype = np.complex)
err = np.zeros(len(mus))
for l in range(len(mu_muTestSample)):
resT[len(self.mus) + l] = (ressamples[l]
* QTest[idx_muTestSample[l]])
p = PI()
wellCond, msg = p.setupByInterpolation(musT, resT, self.E + 1,
self.polybasis, self.verbosity >= 15,
True, {}, {"rcond": self.interpRcond})
err += np.abs(p(musC))
resT[len(self.mus) + l] = 0.
err /= QTest
vbMng(self, "MAIN", msg, 15)
self.trainedModel.verbosity = verb
vbMng(self.trainedModel, "DEL", "Done evaluating error estimator", 10)
return err
def errorEstimatorEIM(self, mus:Np1D,
return_max_idxs : bool = False) -> Np1D:
"""EIM-like interpolation error estimator."""
setupOK = self.setupApprox()
if not setupOK:
err = np.empty(len(mus))
err[:] = np.nan
return err
self._setupInterpolationIndices()
mus = checkParameterList(mus, self.npar)[0]
vbMng(self.trainedModel, "INIT",
"Evaluating error estimator at mu = {}.".format(mus), 10)
verb = self.trainedModel.verbosity
self.trainedModel.verbosity = 0
QTest = self.trainedModel.getQVal(mus)
muCTest = self.trainedModel.centerNormalize(mus)
muCTrain = self.trainedModel.centerNormalize(self.mus)
vanderTest, _, vanderTestR = pvT(muCTest, self.E, self.polybasis)
vanderTest = vanderTest[:, vanderTestR]
vanderTestNext, _, vanderTestNextR = pvT(muCTest, self.E + 1,
self.polybasis)
vanderTestNext = vanderTestNext[:, vanderTestNextR[
vanderTest.shape[1] :]]
idxsTest = np.arange(vanderTestNext.shape[1])
basis = np.zeros((len(idxsTest), 0), dtype = float)
idxMaxEst = []
err = None
while len(idxsTest) > 0:
vanderTrial, _, vanderTrialR = pvT(muCTrain, self.E,
self.polybasis)
vanderTrial = vanderTrial[:, vanderTrialR]
vanderTrialNext, _, vanderTrialNextR = pvT(muCTrain, self.E + 1,
self.polybasis)
vanderTrialNext = vanderTrialNext[:, vanderTrialNextR[
vanderTrial.shape[1] :]]
vanderTrial = np.hstack((vanderTrial,
vanderTrialNext.dot(basis).reshape(
len(vanderTrialNext),
basis.shape[1])))
valuesTrial = vanderTrialNext[:, idxsTest]
vanderTestEff = np.hstack((vanderTest,
vanderTestNext.dot(basis).reshape(
len(vanderTestNext),
basis.shape[1])))
vanderTestNextEff = vanderTestNext[:, idxsTest]
coeffTest = np.linalg.solve(vanderTrial, valuesTrial)
errTest = np.abs((vanderTestNextEff - vanderTestEff.dot(coeffTest))
/ np.expand_dims(QTest, 1))
idxMaxErr = np.unravel_index(np.argmax(errTest), errTest.shape)
idxMaxEst += [idxMaxErr[0]]
if err is None: err = np.max(errTest, axis = 1)
if not return_max_idxs: break
muCTrain.append(muCTest[idxMaxErr[0]])
basis = np.pad(basis, [(0, 0), (0, 1)], "constant")
basis[idxsTest[idxMaxErr[1]], -1] = 1.
idxsTest = np.delete(idxsTest, idxMaxErr[1])
if self.errorEstimatorKind == "EIM_DIAGONAL":
self.assembleReducedResidualBlocks(full = False)
muTestEff = mus ** self.HFEngine.rescalingExp
radiusb = np.empty((len(self.HFEngine.thbs), len(mus)),
dtype = np.complex)
for j, thb in enumerate(self.HFEngine.thbs):
radiusb[j] = expressionEvaluator(thb[0], muTestEff)
bresb = self._affineResidualMatricesContraction(radiusb)
self.assembleReducedResidualGramian(self.trainedModel.data.projMat)
pDom = (polydomcoeff(self.E, self.polybasis)
* self.trainedModel.data.P[(-1,) + (0,) * (self.npar - 1)])
LL = pDom.conj().dot(self.trainedModel.data.gramian.dot(pDom))
if not hasattr(self, "Anorm2Approx"):
if self.HFEngine.nAs > 1:
Ader = self.HFEngine.A(self.mu0,
[1] + [0] * (self.npar - 1))
try:
Adiag = self.scaleFactor[0] * Ader.diagonal()
except:
Adiag = self.scaleFactor[0] * np.diagonal(Ader)
self.Anorm2Approx = np.mean(np.abs(Adiag) ** 2.)
if (np.isclose(self.Anorm2Approx, 0.)
or self.HFEngine.nAs <= 1):
self.Anorm2Approx = 1
jOpt = np.abs(self.Anorm2Approx * LL / bresb) ** .5
err = jOpt * err
else: #if self.errorEstimatorKind == "EIM_INTERPOLATORY":
self.initEstimatorNormEngine()
mu_muTestSample = mus[idxMaxEst[0]]
app_muTestSample = self.getApproxReduced(mu_muTestSample)
if self._mode == RROMPy_FRAGILE:
if not self.HFEngine.isCEye:
raise RROMPyException(("Cannot compute EIM_INTERPOLATORY "
"residual estimator in fragile "
"mode for non-scalar C."))
app_muTestSample = dot(self.trainedModel.data.projMat,
app_muTestSample.data)
else:
app_muTestSample = dot(self.samplingEngine.samples,
app_muTestSample)
resmu = self.HFEngine.residual(mu_muTestSample, app_muTestSample,
post_c = False)
RHSmu = self.HFEngine.residual(mu_muTestSample, None,
post_c = False)
jOpt = np.abs(self.estimatorNormEngine.norm(resmu)[0]
/ err[idxMaxEst[0]]
/ self.estimatorNormEngine.norm(RHSmu)[0])
err = jOpt * err
self.trainedModel.verbosity = verb
vbMng(self.trainedModel, "DEL", "Done evaluating error estimator", 10)
if return_max_idxs: return err, idxMaxEst
return err
def getMaxErrorEstimator(self, mus:paramList) -> Tuple[Np1D, int, float]:
"""
Compute maximum of (and index of maximum of) error estimator over given
parameters.
"""
if self.errorEstimatorKind[: 4] == "EIM_":
errorEstTest, idxMaxEst = self.errorEstimatorEIM(mus, True)
else:
errorEstTest = self.errorEstimator(mus)
idxMaxEst = np.empty(self.sampleBatchSize, dtype = int)
errCP = copy(errorEstTest)
for j in range(self.sampleBatchSize):
k = np.argmax(errCP)
idxMaxEst[j] = k
if j + 1 < self.sampleBatchSize:
musZero = self.trainedModel.centerNormalize(mus, mus[k])
errCP *= np.linalg.norm(musZero.data, axis = 1)
maxEst = errorEstTest[idxMaxEst]
return errorEstTest, idxMaxEst, maxEst
def greedyNextSample(self, muidx:int, plotEst : bool = False)\
-> Tuple[Np1D, int, float, paramVal]:
"""Compute next greedy snapshot of solution map."""
RROMPyAssert(self._mode, message = "Cannot add greedy sample.")
self.sampleBatchIdx += 1
self.sampleBatchSize = totalDegreeN(self.npar - 1, self.sampleBatchIdx)
return super().greedyNextSample(muidx, plotEst)
def _preliminaryTraining(self):
"""Initialize starting snapshots of solution map."""
RROMPyAssert(self._mode, message = "Cannot start greedy algorithm.")
if self.samplingEngine.nsamples > 0:
return
S = self.S
self.sampleBatchIdx, self.sampleBatchSize, self._S = -1, 0, 0
nextBatchSize = 1
while self._S + nextBatchSize <= S:
self.sampleBatchIdx += 1
self.sampleBatchSize = nextBatchSize
self._S += self.sampleBatchSize
nextBatchSize = totalDegreeN(self.npar - 1,
self.sampleBatchIdx + 1)
super()._preliminaryTraining()
def setupApprox(self, plotEst : bool = False):
"""
Compute rational interpolant.
SVD-based robust eigenvalue management.
"""
if self.checkComputedApprox():
return True
RROMPyAssert(self._mode, message = "Cannot setup approximant.")
vbMng(self, "INIT", "Setting up {}.".format(self.name()), 5)
self.greedy(plotEst)
self._N, self._M = [self.E] * 2
pMat = self.samplingEngine.samples.data
pMatEff = dot(self.HFEngine.C, pMat)
if self.trainedModel is None:
self.trainedModel = self.tModelType()
self.trainedModel.verbosity = self.verbosity
self.trainedModel.timestamp = self.timestamp
datadict = {"mu0": self.mu0, "projMat": pMatEff,
"scaleFactor": self.scaleFactor,
"rescalingExp": self.HFEngine.rescalingExp}
self.trainedModel.data = self.initializeModelData(datadict)[0]
else:
self.trainedModel = self.trainedModel
self.trainedModel.data.projMat = copy(pMatEff)
self.trainedModel.data.mus = copy(self.mus)
self.trainedModel.data.mus = copy(self.mus)
self.catchInstability = True
if self.N > 0:
try:
Q = self._setupDenominator()[0]
except RROMPyException as RE:
RROMPyWarning(RE._msg)
vbMng(self, "DEL", "Done setting up approximant.", 5)
return False
else:
Q = PI()
Q.coeffs = np.ones(1, dtype = np.complex)
Q.npar = 1
Q.polybasis = self.polybasis
self.trainedModel.data.Q = copy(Q)
try:
self.trainedModel.data.P = copy(self._setupNumerator())
except RROMPyException as RE:
RROMPyWarning(RE._msg)
vbMng(self, "DEL", "Done setting up approximant.", 5)
return False
self.trainedModel.data.approxParameters = copy(self.approxParameters)
vbMng(self, "DEL", "Done setting up approximant.", 5)
return True
def loadTrainedModel(self, filename:str):
"""Load trained reduced model from file."""
super().loadTrainedModel(filename)
self.sampleBatchIdx, self.sampleBatchSize, _S = -1, 0, 0
nextBatchSize = 1
while _S + nextBatchSize <= self.S + 1:
self.sampleBatchIdx += 1
self.sampleBatchSize = nextBatchSize
_S += self.sampleBatchSize
nextBatchSize = totalDegreeN(self.npar - 1,
self.sampleBatchIdx + 1)

Event Timeline