Page MenuHomec4science

rational_interpolant.py
No OneTemporary

File Metadata

Created
Sat, May 4, 18:39

rational_interpolant.py

# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see <http://www.gnu.org/licenses/>.
#
from copy import deepcopy as copy
import numpy as np
from rrompy.reduction_methods.base import checkRobustTolerance
from .generic_distributed_approximant import GenericDistributedApproximant
from rrompy.utilities.poly_fitting.polynomial import (polybases, polyfitname,
nextDerivativeIndices,
hashDerivativeToIdx as hashD,
hashIdxToDerivative as hashI,
homogeneizedpolyvander as hpvP,
homogeneizedToFull,
PolynomialInterpolator as PI)
from rrompy.utilities.poly_fitting.radial_basis import (rbbases,
RadialBasisInterpolator as RBI)
from rrompy.reduction_methods.trained_model import (
TrainedModelRational as tModel)
from rrompy.reduction_methods.trained_model import TrainedModelData
from rrompy.utilities.base.types import (Np1D, Np2D, HFEng, DictAny, Tuple,
List, paramVal, paramList, sampList)
from rrompy.utilities.base import verbosityManager as vbMng
from rrompy.utilities.numerical import multifactorial, customPInv
from rrompy.utilities.exception_manager import (RROMPyException, RROMPyAssert,
RROMPyWarning)
__all__ = ['RationalInterpolant']
class RationalInterpolant(GenericDistributedApproximant):
"""
ROM rational interpolant computation for parametric problems.
Args:
HFEngine: HF problem solver.
mu0(optional): Default parameter. Defaults to 0.
approxParameters(optional): Dictionary containing values for main
parameters of approximant. Recognized keys are:
- 'POD': whether to compute POD of snapshots; defaults to True;
- 'S': total number of samples current approximant relies upon;
- 'sampler': sample point generator;
- 'polybasis': type of polynomial basis for interpolation; allowed
values include 'MONOMIAL', 'CHEBYSHEV' and 'LEGENDRE'; defaults
to 'MONOMIAL';
- 'E': number of derivatives used to compute interpolant; defaults
to 0;
- 'M': degree of rational interpolant numerator; defaults to 0;
- 'N': degree of rational interpolant denominator; defaults to 0;
- 'radialBasis': radial basis family for interpolant numerator;
defaults to 0, i.e. no radial basis;
- 'radialBasisWeights': radial basis weights for interpolant
numerator; defaults to 0, i.e. identity;
- 'interpRcond': tolerance for interpolation; defaults to None;
- 'robustTol': tolerance for robust rational denominator
management; defaults to 0.
Defaults to empty dict.
homogeneized(optional): Whether to homogeneize Dirichlet BCs. Defaults
to False.
verbosity(optional): Verbosity level. Defaults to 10.
Attributes:
HFEngine: HF problem solver.
mu0: Default parameter.
mus: Array of snapshot parameters.
homogeneized: Whether to homogeneize Dirichlet BCs.
approxParameters: Dictionary containing values for main parameters of
approximant. Recognized keys are in parameterList.
parameterListSoft: Recognized keys of soft approximant parameters:
- 'POD': whether to compute POD of snapshots;
- 'polybasis': type of polynomial basis for interpolation;
- 'E': number of derivatives used to compute interpolant;
- 'M': degree of rational interpolant numerator;
- 'N': degree of rational interpolant denominator;
- 'radialBasis': radial basis family for interpolant numerator;
- 'radialBasisWeights': radial basis weights for interpolant
numerator;
- 'interpRcond': tolerance for interpolation via numpy.polyfit;
- 'robustTol': tolerance for robust rational denominator
management.
parameterListCritical: Recognized keys of critical approximant
parameters:
- 'S': total number of samples current approximant relies upon;
- 'sampler': sample point generator.
POD: Whether to compute POD of snapshots.
S: Number of solution snapshots over which current approximant is
based upon.
sampler: Sample point generator.
polybasis: type of polynomial basis for interpolation.
M: Numerator degree of approximant.
N: Denominator degree of approximant.
radialBasis: Radial basis family for interpolant numerator.
radialBasisWeights: Radial basis weights for interpolant numerator.
interpRcond: Tolerance for interpolation via numpy.polyfit.
robustTol: Tolerance for robust rational denominator management.
E: Complete derivative depth level of S.
muBounds: list of bounds for parameter values.
samplingEngine: Sampling engine.
uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as
sampleList.
lastSolvedHF: Parameter(s) corresponding to last computed high fidelity
solution(s) as parameterList.
uApproxReduced: Reduced approximate solution(s) with parameter(s)
lastSolvedApprox as sampleList.
lastSolvedApproxReduced: Parameter(s) corresponding to last computed
reduced approximate solution(s) as parameterList.
uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as
sampleList.
lastSolvedApprox: Parameter(s) corresponding to last computed
approximate solution(s) as parameterList.
Q: Numpy 1D vector containing complex coefficients of approximant
denominator.
P: Numpy 2D vector whose columns are FE dofs of coefficients of
approximant numerator.
"""
def __init__(self, HFEngine:HFEng, mu0 : paramVal = None,
approxParameters : DictAny = {}, homogeneized : bool = False,
verbosity : int = 10, timestamp : bool = True):
self._preInit()
self._addParametersToList(["polybasis", "E", "M", "N", "radialBasis",
"radialBasisWeights", "interpRcond",
"robustTol"],
["MONOMIAL", -1, 0, 0, 0, 1, -1, 0])
super().__init__(HFEngine = HFEngine, mu0 = mu0,
approxParameters = approxParameters,
homogeneized = homogeneized,
verbosity = verbosity, timestamp = timestamp)
self._postInit()
@property
def polybasis(self):
"""Value of polybasis."""
return self._polybasis
@polybasis.setter
def polybasis(self, polybasis):
try:
polybasis = polybasis.upper().strip().replace(" ","")
if polybasis not in polybases:
raise RROMPyException("Prescribed polybasis not recognized.")
self._polybasis = polybasis
except:
RROMPyWarning(("Prescribed polybasis not recognized. Overriding "
"to 'MONOMIAL'."))
self._polybasis = "MONOMIAL"
self._approxParameters["polybasis"] = self.polybasis
@property
def radialBasis(self):
"""Value of radialBasis."""
return self._radialBasis
@radialBasis.setter
def radialBasis(self, radialBasis):
try:
if radialBasis != 0:
radialBasis = radialBasis.upper().strip().replace(" ","")
if radialBasis not in rbbases:
raise RROMPyException(("Prescribed radialBasis not "
"recognized."))
self._radialBasis = radialBasis
except:
RROMPyWarning(("Prescribed radialBasis not recognized. Overriding "
"to 0."))
self._radialBasis = 0
self._approxParameters["radialBasis"] = self.radialBasis
@property
def polybasisP(self):
if self.radialBasis == 0:
return self._polybasis
return self._polybasis + "_" + self.radialBasis
@property
def interpRcond(self):
"""Value of interpRcond."""
return self._interpRcond
@interpRcond.setter
def interpRcond(self, interpRcond):
self._interpRcond = interpRcond
self._approxParameters["interpRcond"] = self.interpRcond
@property
def radialBasisWeights(self):
"""Value of radialBasisWeights."""
return self._radialBasisWeights
@radialBasisWeights.setter
def radialBasisWeights(self, radialBasisWeights):
self._radialBasisWeights = radialBasisWeights
self._approxParameters["radialBasisWeights"] = self.radialBasisWeights
@property
def M(self):
"""Value of M. Its assignment may change S."""
return self._M
@M.setter
def M(self, M):
if M < 0: raise RROMPyException("M must be non-negative.")
self._M = M
self._approxParameters["M"] = self.M
if hasattr(self, "_E") and self.E >= 0 and self.E < self.M:
RROMPyWarning("Prescribed S is too small. Decreasing M.")
self.M = self.E
@property
def N(self):
"""Value of N. Its assignment may change S."""
return self._N
@N.setter
def N(self, N):
if N < 0: raise RROMPyException("N must be non-negative.")
self._N = N
self._approxParameters["N"] = self.N
if hasattr(self, "_E") and self.E >= 0 and self.E < self.N:
RROMPyWarning("Prescribed S is too small. Decreasing N.")
self.N = self.E
@property
def E(self):
"""Value of E."""
return self._E
@E.setter
def E(self, E):
if E < 0:
if not hasattr(self, "_S"):
raise RROMPyException(("Value of E must be positive if S is "
"not yet assigned."))
E = np.sum(hashI(np.prod(self.S), self.npar)) - 1
self._E = E
self._approxParameters["E"] = self.E
if (hasattr(self, "_S")
and self.E >= np.sum(hashI(np.prod(self.S), self.npar))):
RROMPyWarning("Prescribed S is too small. Decreasing E.")
self.E = -1
if hasattr(self, "_M"): self.M = self.M
if hasattr(self, "_N"): self.N = self.N
@property
def robustTol(self):
"""Value of tolerance for robust rational denominator management."""
return self._robustTol
@robustTol.setter
def robustTol(self, robustTol):
if robustTol < 0.:
RROMPyWarning(("Overriding prescribed negative robustness "
"tolerance to 0."))
robustTol = 0.
self._robustTol = robustTol
self._approxParameters["robustTol"] = self.robustTol
@property
def S(self):
"""Value of S."""
return self._S
@S.setter
def S(self, S):
GenericDistributedApproximant.S.fset(self, S)
if hasattr(self, "_M"): self.M = self.M
if hasattr(self, "_N"): self.N = self.N
if hasattr(self, "_E"): self.E = self.E
def resetSamples(self):
"""Reset samples."""
super().resetSamples()
self._musUniqueCN = None
self._derIdxs = None
self._reorder = None
def _setupInterpolationIndices(self):
"""Setup parameters for polyvander."""
RROMPyAssert(self._mode,
message = "Cannot setup interpolation indices.")
if self._musUniqueCN is None or len(self._reorder) != len(self.mus):
self._musUniqueCN, musIdxsTo, musIdxs, musCount = (
self.centerNormalize(self.mus).unique(return_index = True,
return_inverse = True,
return_counts = True))
self._musUnique = self.mus[musIdxsTo]
self._derIdxs = [None] * len(self._musUniqueCN)
self._reorder = np.empty(len(musIdxs), dtype = int)
filled = 0
for j, cnt in enumerate(musCount):
self._derIdxs[j] = nextDerivativeIndices([], self.mus.shape[1],
cnt)
jIdx = np.nonzero(musIdxs == j)[0]
self._reorder[jIdx] = np.arange(filled, filled + cnt)
filled += cnt
def _setupDenominator(self):
"""Compute rational denominator."""
RROMPyAssert(self._mode, message = "Cannot setup denominator.")
vbMng(self, "INIT", "Starting computation of denominator.", 7)
while self.N > 0:
invD = self._computeInterpolantInverseBlocks()
if self.POD:
ev, eV = self.findeveVGQR(self.samplingEngine.RPOD, invD)
else:
ev, eV = self.findeveVGExplicit(self.samplingEngine.samples,
invD)
newParams = checkRobustTolerance(ev, self.N, self.robustTol)
if not newParams:
break
self.approxParameters = newParams
if self.N <= 0:
self._N = 0
eV = np.ones((1, 1))
q = PI()
q.npar = self.npar
q.polybasis = self.polybasis
q.coeffs = homogeneizedToFull(tuple([self.N + 1] * self.npar),
self.npar, eV[:, 0])
vbMng(self, "DEL", "Done computing denominator.", 7)
return q
def _setupNumerator(self):
"""Compute rational numerator."""
RROMPyAssert(self._mode, message = "Cannot setup numerator.")
vbMng(self, "INIT", "Starting computation of numerator.", 7)
Qevaldiag = np.zeros((len(self.mus), len(self.mus)),
dtype = np.complex)
verb = self.trainedModel.verbosity
self.trainedModel.verbosity = 0
self._setupInterpolationIndices()
idxGlob = 0
for j, derIdxs in enumerate(self._derIdxs):
nder = len(derIdxs)
idxLoc = np.arange(len(self.mus))[(self._reorder >= idxGlob)
* (self._reorder < idxGlob + nder)]
idxGlob += nder
Qval = [0] * nder
for der in range(nder):
derIdx = hashI(der, self.npar)
Qval[der] = (self.trainedModel.getQVal(
self._musUnique[j], derIdx,
scl = np.power(self.scaleFactor, -1.))
/ multifactorial(derIdx))
for derU, derUIdx in enumerate(derIdxs):
for derQ, derQIdx in enumerate(derIdxs):
diffIdx = [x - y for (x, y) in zip(derUIdx, derQIdx)]
if all([x >= 0 for x in diffIdx]):
diffj = hashD(diffIdx)
Qevaldiag[idxLoc[derU], idxLoc[derQ]] = Qval[diffj]
if self.POD:
Qevaldiag = Qevaldiag.dot(self.samplingEngine.RPOD.T)
self.trainedModel.verbosity = verb
while self.M >= 0:
if self.radialBasis == 0:
p = PI()
wellCond, msg = p.setupByInterpolation(
self._musUniqueCN, Qevaldiag, self.M,
self.polybasisP, self.verbosity >= 5, True,
{"derIdxs": self._derIdxs, "reorder": self._reorder,
"scl": np.power(self.scaleFactor, -1.)},
{"rcond": self.interpRcond})
else:
p = RBI()
wellCond, msg = p.setupByInterpolation(
self._musUniqueCN, Qevaldiag, self.M, self.polybasisP,
self.radialBasisWeights, self.verbosity >= 5, True,
{"derIdxs": self._derIdxs, "reorder": self._reorder,
"scl": np.power(self.scaleFactor, -1.)},
{"rcond": self.interpRcond})
vbMng(self, "MAIN", msg, 5)
if wellCond: break
RROMPyWarning("Polyfit is poorly conditioned. Reducing M by 1.")
self.M -= 1
if self.M < 0:
raise RROMPyException(("Instability in computation of numerator. "
"Aborting."))
vbMng(self, "DEL", "Done computing numerator.", 7)
return p
def setupApprox(self):
"""
Compute rational interpolant.
SVD-based robust eigenvalue management.
"""
if self.checkComputedApprox():
return
RROMPyAssert(self._mode, message = "Cannot setup approximant.")
vbMng(self, "INIT", "Setting up {}.". format(self.name()), 5)
self.computeScaleFactor()
self.computeSnapshots()
if self.trainedModel is None:
self.trainedModel = tModel()
self.trainedModel.verbosity = self.verbosity
self.trainedModel.timestamp = self.timestamp
data = TrainedModelData(self.trainedModel.name(), self.mu0,
self.samplingEngine.samples,
self.scaleFactor,
self.HFEngine.rescalingExp)
data.mus = copy(self.mus)
self.trainedModel.data = data
else:
self.trainedModel = self.trainedModel
self.trainedModel.data.projMat = copy(self.samplingEngine.samples)
if self.N > 0:
Q = self._setupDenominator()
else:
Q = PI()
Q.coeffs = np.ones(tuple([1] * self.npar), dtype = np.complex)
Q.npar = self.npar
Q.polybasis = self.polybasis
self.trainedModel.data.Q = Q
self.trainedModel.data.P = self._setupNumerator()
self.trainedModel.data.approxParameters = copy(self.approxParameters)
vbMng(self, "DEL", "Done setting up approximant.", 5)
def _computeInterpolantInverseBlocks(self) -> List[Np2D]:
"""
Compute inverse factors for minimal interpolant target functional.
"""
RROMPyAssert(self._mode, message = "Cannot solve eigenvalue problem.")
self._setupInterpolationIndices()
while self.E >= 0:
eWidth = (hashD([self.E + 1] + [0] * (self.npar - 1))
- hashD([self.E] + [0] * (self.npar - 1)))
TE, _, argIdxs = hpvP(self._musUniqueCN, self.E, self.polybasis,
self._derIdxs, self._reorder,
scl = np.power(self.scaleFactor, -1.))
fitOut = customPInv(TE[:, argIdxs], rcond = self.interpRcond,
full = True)
vbMng(self, "MAIN",
("Fitting {} samples with degree {} through {}... "
"Conditioning of pseudoinverse system: {:.4e}.").format(
TE.shape[0], self.E,
polyfitname(self.polybasis),
fitOut[1][1][0] / fitOut[1][1][-1]),
5)
if fitOut[1][0] == len(argIdxs):
self._fitinv = fitOut[0][- eWidth : , :]
break
RROMPyWarning("Polyfit is poorly conditioned. Reducing E by 1.")
self.E -= 1
if self.E < 0:
raise RROMPyException(("Instability in computation of "
"denominator. Aborting."))
TN, _, argIdxs = hpvP(self._musUniqueCN, self.N, self.polybasis,
self._derIdxs, self._reorder,
scl = np.power(self.scaleFactor, -1.))
TN = TN[:, argIdxs]
invD = [None] * (eWidth)
for k in range(eWidth):
pseudoInv = np.diag(self._fitinv[k, :])
idxGlob = 0
for j, derIdxs in enumerate(self._derIdxs):
nder = len(derIdxs)
idxGlob += nder
if nder > 1:
idxLoc = np.arange(len(self.mus))[
(self._reorder >= idxGlob - nder)
* (self._reorder < idxGlob)]
invLoc = self._fitinv[k, idxLoc]
pseudoInv[np.ix_(idxLoc, idxLoc)] = 0.
for diffj, diffjIdx in enumerate(derIdxs):
for derQ, derQIdx in enumerate(derIdxs):
derUIdx = [x - y for (x, y) in
zip(diffjIdx, derQIdx)]
if all([x >= 0 for x in derUIdx]):
derU = hashD(derUIdx)
pseudoInv[idxLoc[derU], idxLoc[derQ]] = (
invLoc[diffj])
invD[k] = pseudoInv.dot(TN)
return invD
def findeveVGExplicit(self, sampleE:sampList,
invD:List[Np2D]) -> Tuple[Np1D, Np2D]:
"""
Compute explicitly eigenvalues and eigenvectors of rational denominator
matrix.
"""
RROMPyAssert(self._mode, message = "Cannot solve eigenvalue problem.")
nEnd = invD[0].shape[1]
eWidth = len(invD)
vbMng(self, "INIT", "Building gramian matrix.", 10)
gramian = self.HFEngine.innerProduct(sampleE, sampleE)
G = np.zeros((nEnd, nEnd), dtype = np.complex)
for k in range(eWidth):
G += invD[k].T.conj().dot(gramian.dot(invD[k]))
vbMng(self, "DEL", "Done building gramian.", 10)
vbMng(self, "INIT", "Solving eigenvalue problem for gramian matrix.",
7)
ev, eV = np.linalg.eigh(G)
vbMng(self, "MAIN",
("Solved eigenvalue problem of size {} with condition number "
"{:.4e}.").format(nEnd, ev[-1] / ev[0]), 5)
vbMng(self, "DEL", "Done solving eigenvalue problem.", 7)
return ev, eV
def findeveVGQR(self, RPODE:Np2D, invD:List[Np2D]) -> Tuple[Np1D, Np2D]:
"""
Compute eigenvalues and eigenvectors of rational denominator matrix
through SVD of R factor.
"""
RROMPyAssert(self._mode, message = "Cannot solve eigenvalue problem.")
nEnd = invD[0].shape[1]
S = RPODE.shape[0]
eWidth = len(invD)
vbMng(self, "INIT", "Building half-gramian matrix stack.", 10)
Rstack = np.zeros((S * eWidth, nEnd), dtype = np.complex)
for k in range(eWidth):
Rstack[k * S : (k + 1) * S, :] = RPODE.dot(invD[k])
vbMng(self, "DEL", "Done building half-gramian.", 10)
vbMng(self, "INIT", "Solving svd for square root of gramian matrix.",
7)
_, s, eV = np.linalg.svd(Rstack, full_matrices = False)
ev = s[::-1]
eV = eV[::-1, :].T.conj()
vbMng(self, "MAIN",
("Solved svd problem of size {} x {} with condition number "
"{:.4e}.").format(*Rstack.shape, s[0] / s[-1]), 5)
vbMng(self, "DEL", "Done solving svd.", 7)
return ev, eV
def centerNormalize(self, mu : paramList = [],
mu0 : paramVal = None) -> paramList:
"""
Compute normalized parameter to be plugged into approximant.
Args:
mu: Parameter(s) 1.
mu0: Parameter(s) 2. If None, set to self.mu0.
Returns:
Normalized parameter.
"""
return self.trainedModel.centerNormalize(mu, mu0)
def getResidues(self, *args, **kwargs) -> Np1D:
"""
Obtain approximant residues.
Returns:
Matrix with residues as columns.
"""
return self.trainedModel.getResidues(*args, **kwargs)

Event Timeline