Page MenuHomec4science

generic_pivoted_approximant.py
No OneTemporary

File Metadata

Created
Sat, May 4, 10:02

generic_pivoted_approximant.py

# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see <http://www.gnu.org/licenses/>.
#
import numpy as np
from copy import deepcopy as copy
from rrompy.reduction_methods.base.generic_approximant import (
GenericApproximant)
from rrompy.utilities.poly_fitting.polynomial import (polybases as ppb,
PolynomialInterpolator as PI)
from rrompy.utilities.poly_fitting.radial_basis import (polybases as rbpb,
RadialBasisInterpolator as RBI)
from rrompy.utilities.poly_fitting.moving_least_squares import (
polybases as mlspb,
MovingLeastSquaresInterpolator as MLSI)
from rrompy.sampling.pivoted import (SamplingEnginePivoted,
SamplingEnginePivotedPOD)
from rrompy.utilities.base.types import (ListAny, DictAny, HFEng, paramVal,
paramList)
from rrompy.utilities.base import verbosityManager as vbMng
from rrompy.utilities.numerical import (fullDegreeN, totalDegreeN,
nextDerivativeIndices)
from rrompy.utilities.exception_manager import (RROMPyException, RROMPyAssert,
RROMPyWarning)
from rrompy.parameter import emptyParameterList
__all__ = ['GenericPivotedApproximant']
class GenericPivotedApproximant(GenericApproximant):
"""
ROM pivoted approximant (with pole matching) computation for parametric
problems (ABSTRACT).
Args:
HFEngine: HF problem solver.
mu0(optional): Default parameter. Defaults to 0.
directionPivot(optional): Pivot components. Defaults to [0].
approxParameters(optional): Dictionary containing values for main
parameters of approximant. Recognized keys are:
- 'POD': whether to compute POD of snapshots; defaults to True;
- 'matchingWeight': weight for pole matching optimization; defaults
to 1;
- 'cutOffTolerance': tolerance for ignoring parasitic poles;
defaults to np.inf;
- 'cutOffType': rule for tolerance computation for parasitic poles;
defaults to 'MAGNITUDE';
- 'S': total number of pivot samples current approximant relies
upon;
- 'samplerPivot': pivot sample point generator;
- 'SMarginal': total number of marginal samples current approximant
relies upon;
- 'samplerMarginal': marginal sample point generator;
- 'polybasisMarginal': type of polynomial basis for marginal
interpolation; allowed values include 'MONOMIAL', 'CHEBYSHEV'
and 'LEGENDRE'; defaults to 'MONOMIAL';
- 'MMarginal': degree of marginal interpolant; defaults to 0;
- 'polydegreetypeMarginal': type of polynomial degree for marginal;
defaults to 'TOTAL';
- 'radialDirectionalWeightsMarginal': radial basis weights for
marginal interpolant; defaults to 0, i.e. identity;
- 'nNearestNeighborMarginal': number of marginal nearest neighbors
considered if polybasisMarginal allows; defaults to -1;
- 'interpRcondMarginal': tolerance for marginal interpolation;
defaults to None.
Defaults to empty dict.
verbosity(optional): Verbosity level. Defaults to 10.
Attributes:
HFEngine: HF problem solver.
mu0: Default parameter.
directionPivot: Pivot components.
mus: Array of snapshot parameters.
musPivot: Array of pivot snapshot parameters.
musMarginal: Array of marginal snapshot parameters.
approxParameters: Dictionary containing values for main parameters of
approximant. Recognized keys are in parameterList.
parameterListSoft: Recognized keys of soft approximant parameters:
- 'POD': whether to compute POD of snapshots;
- 'matchingWeight': weight for pole matching optimization;
- 'cutOffTolerance': tolerance for ignoring parasitic poles;
- 'cutOffType': rule for tolerance computation for parasitic poles;
- 'polybasisMarginal': type of polynomial basis for marginal
interpolation;
- 'MMarginal': degree of marginal interpolant;
- 'polydegreetypeMarginal': type of polynomial degree for marginal;
- 'radialDirectionalWeightsMarginal': radial basis weights for
marginal interpolant;
- 'nNearestNeighborMarginal': number of marginal nearest neighbors
considered if polybasisMarginal allows;
- 'interpRcondMarginal': tolerance for marginal interpolation.
parameterListCritical: Recognized keys of critical approximant
parameters:
- 'S': total number of pivot samples current approximant relies
upon;
- 'samplerPivot': pivot sample point generator;
- 'SMarginal': total number of marginal samples current approximant
relies upon;
- 'samplerMarginal': marginal sample point generator.
POD: Whether to compute POD of snapshots.
matchingWeight: Weight for pole matching optimization.
cutOffTolerance: Tolerance for ignoring parasitic poles.
cutOffType: Rule for tolerance computation for parasitic poles.
S: Total number of pivot samples current approximant relies upon.
samplerPivot: Pivot sample point generator.
SMarginal: Total number of marginal samples current approximant relies
upon.
samplerMarginal: Marginal sample point generator.
polybasisMarginal: Type of polynomial basis for marginal interpolation.
MMarginal: Degree of marginal interpolant.
polydegreetypeMarginal: Type of polynomial degree for marginal.
radialDirectionalWeightsMarginal: Radial basis weights for marginal
interpolant.
nNearestNeighborMarginal: Number of marginal nearest neighbors
considered if polybasisMarginal allows.
interpRcondMarginal: Tolerance for marginal interpolation.
muBoundsPivot: list of bounds for pivot parameter values.
muBoundsMarginal: list of bounds for marginal parameter values.
samplingEngine: Sampling engine.
uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as
sampleList.
lastSolvedHF: Parameter(s) corresponding to last computed high fidelity
solution(s) as parameterList.
uApproxReduced: Reduced approximate solution(s) with parameter(s)
lastSolvedApprox as sampleList.
lastSolvedApproxReduced: Parameter(s) corresponding to last computed
reduced approximate solution(s) as parameterList.
uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as
sampleList.
lastSolvedApprox: Parameter(s) corresponding to last computed
approximate solution(s) as parameterList.
"""
def __init__(self, HFEngine:HFEng, mu0 : paramVal = None,
directionPivot : ListAny = [0],
approxParameters : DictAny = {}, verbosity : int = 10,
timestamp : bool = True):
self._preInit()
if len(directionPivot) > 1:
raise RROMPyException(("Exactly 1 pivot parameter allowed in pole "
"matching."))
from rrompy.parameter.parameter_sampling import QuadratureSampler as QS
QSBase = QS([[0], [1]], "UNIFORM")
self._addParametersToList(["matchingWeight", "cutOffTolerance",
"cutOffType", "polybasisMarginal",
"MMarginal", "polydegreetypeMarginal",
"radialDirectionalWeightsMarginal",
"nNearestNeighborMarginal",
"interpRcondMarginal"],
[1, np.inf, "MAGNITUDE", "MONOMIAL", 0,
"TOTAL", 1, -1, -1],
["samplerPivot", "SMarginal",
"samplerMarginal"], [QSBase, [1], QSBase])
del QS
self._directionPivot = directionPivot
super().__init__(HFEngine = HFEngine, mu0 = mu0,
approxParameters = approxParameters,
verbosity = verbosity, timestamp = timestamp)
self._postInit()
def setupSampling(self):
"""Setup sampling engine."""
RROMPyAssert(self._mode, message = "Cannot setup sampling engine.")
if not hasattr(self, "_POD") or self._POD is None: return
if self.POD:
SamplingEngine = SamplingEnginePivotedPOD
else:
SamplingEngine = SamplingEnginePivoted
self.samplingEngine = SamplingEngine(self.HFEngine,
self.directionPivot,
verbosity = self.verbosity)
@property
def matchingWeight(self):
"""Value of matchingWeight."""
return self._matchingWeight
@matchingWeight.setter
def matchingWeight(self, matchingWeight):
self._matchingWeight = matchingWeight
self._approxParameters["matchingWeight"] = self.matchingWeight
@property
def cutOffTolerance(self):
"""Value of cutOffTolerance."""
return self._cutOffTolerance
@cutOffTolerance.setter
def cutOffTolerance(self, cutOffTolerance):
self._cutOffTolerance = cutOffTolerance
self._approxParameters["cutOffTolerance"] = self.cutOffTolerance
@property
def cutOffType(self):
"""Value of cutOffType."""
return self._cutOffType
@cutOffType.setter
def cutOffType(self, cutOffType):
try:
cutOffType = cutOffType.upper().strip().replace(" ","")
if cutOffType not in ["MAGNITUDE", "POTENTIAL"]:
raise RROMPyException("Prescribed cutOffType not recognized.")
self._cutOffType = cutOffType
except:
RROMPyWarning(("Prescribed cutOffType not recognized. Overriding "
"to 'MAGNITUDE'."))
self._cutOffType = "MAGNITUDE"
self._approxParameters["cutOffType"] = self.cutOffType
@property
def SMarginal(self):
"""Value of SMarginal."""
return self._SMarginal
@SMarginal.setter
def SMarginal(self, SMarginal):
if SMarginal <= 0:
raise RROMPyException("SMarginal must be positive.")
if hasattr(self, "_SMarginal") and self._SMarginal is not None:
Sold = self.SMarginal
else: Sold = -1
self._SMarginal = SMarginal
self._approxParameters["SMarginal"] = self.SMarginal
if Sold != self.SMarginal: self.resetSamples()
@property
def polybasisMarginal(self):
"""Value of polybasisMarginal."""
return self._polybasisMarginal
@polybasisMarginal.setter
def polybasisMarginal(self, polybasisMarginal):
try:
polybasisMarginal = polybasisMarginal.upper().strip().replace(" ",
"")
if polybasisMarginal not in ppb + rbpb + mlspb:
raise RROMPyException(
"Prescribed marginal polybasis not recognized.")
self._polybasisMarginal = polybasisMarginal
except:
RROMPyWarning(("Prescribed marginal polybasis not recognized. "
"Overriding to 'MONOMIAL'."))
self._polybasisMarginal = "MONOMIAL"
self._approxParameters["polybasisMarginal"] = self.polybasisMarginal
@property
def MMarginal(self):
"""Value of MMarginal."""
return self._MMarginal
@MMarginal.setter
def MMarginal(self, MMarginal):
if MMarginal < 0:
raise RROMPyException("MMarginal must be non-negative.")
self._MMarginal = MMarginal
self._approxParameters["MMarginal"] = self.MMarginal
@property
def polydegreetypeMarginal(self):
"""Value of polydegreetypeMarginal."""
return self._polydegreetypeMarginal
@polydegreetypeMarginal.setter
def polydegreetypeMarginal(self, polydegreetypeM):
try:
polydegreetypeM = polydegreetypeM.upper().strip().replace(" ","")
if polydegreetypeM not in ["TOTAL", "FULL"]:
raise RROMPyException(("Prescribed polydegreetypeMarginal not "
"recognized."))
self._polydegreetypeMarginal = polydegreetypeM
except:
RROMPyWarning(("Prescribed polydegreetypeMarginal not recognized. "
"Overriding to 'TOTAL'."))
self._polydegreetypeMarginal = "TOTAL"
self._approxParameters["polydegreetypeMarginal"] = (
self.polydegreetypeMarginal)
@property
def radialDirectionalWeightsMarginal(self):
"""Value of radialDirectionalWeightsMarginal."""
return self._radialDirectionalWeightsMarginal
@radialDirectionalWeightsMarginal.setter
def radialDirectionalWeightsMarginal(self, radialDirWeightsMarginal):
self._radialDirectionalWeightsMarginal = radialDirWeightsMarginal
self._approxParameters["radialDirectionalWeightsMarginal"] = (
self.radialDirectionalWeightsMarginal)
@property
def nNearestNeighborMarginal(self):
"""Value of nNearestNeighborMarginal."""
return self._nNearestNeighborMarginal
@nNearestNeighborMarginal.setter
def nNearestNeighborMarginal(self, nNearestNeighborMarginal):
self._nNearestNeighborMarginal = nNearestNeighborMarginal
self._approxParameters["nNearestNeighborMarginal"] = (
self.nNearestNeighborMarginal)
@property
def interpRcondMarginal(self):
"""Value of interpRcondMarginal."""
return self._interpRcondMarginal
@interpRcondMarginal.setter
def interpRcondMarginal(self, interpRcondMarginal):
self._interpRcondMarginal = interpRcondMarginal
self._approxParameters["interpRcondMarginal"] = (
self.interpRcondMarginal)
@property
def directionPivot(self):
"""Value of directionPivot. Its assignment may reset snapshots."""
return self._directionPivot
@directionPivot.setter
def directionPivot(self, directionPivot):
if hasattr(self, '_directionPivot'):
directionPivotOld = copy(self.directionPivot)
else:
directionPivotOld = None
if (directionPivotOld is None
or len(directionPivot) != len(directionPivotOld)
or not directionPivot == directionPivotOld):
self.resetSamples()
self._directionPivot = directionPivot
@property
def directionMarginal(self):
return [x for x in range(self.HFEngine.npar) \
if x not in self.directionPivot]
@property
def nparPivot(self):
return len(self.directionPivot)
@property
def nparMarginal(self):
return self.npar - self.nparPivot
@property
def rescalingExpPivot(self):
return [self.HFEngine.rescalingExp[x] for x in self.directionPivot]
@property
def rescalingExpMarginal(self):
return [self.HFEngine.rescalingExp[x] for x in self.directionMarginal]
@property
def muBoundsPivot(self):
"""Value of muBoundsPivot."""
return self.samplerPivot.lims
@property
def muBoundsMarginal(self):
"""Value of muBoundsMarginal."""
return self.samplerMarginal.lims
@property
def samplerPivot(self):
"""Value of samplerPivot."""
return self._samplerPivot
@samplerPivot.setter
def samplerPivot(self, samplerPivot):
if 'generatePoints' not in dir(samplerPivot):
raise RROMPyException("Pivot sampler type not recognized.")
if hasattr(self, '_samplerPivot') and self._samplerPivot is not None:
samplerOld = self.samplerPivot
self._samplerPivot = samplerPivot
self._approxParameters["samplerPivot"] = self.samplerPivot.__str__()
if not 'samplerOld' in locals() or samplerOld != self.samplerPivot:
self.resetSamples()
@property
def samplerMarginal(self):
"""Value of samplerMarginal."""
return self._samplerMarginal
@samplerMarginal.setter
def samplerMarginal(self, samplerMarginal):
if 'generatePoints' not in dir(samplerMarginal):
raise RROMPyException("Marginal sampler type not recognized.")
if (hasattr(self, '_samplerMarginal')
and self._samplerMarginal is not None):
samplerOld = self.samplerMarginal
self._samplerMarginal = samplerMarginal
self._approxParameters["samplerMarginal"] = (
self.samplerMarginal.__str__())
if not 'samplerOld' in locals() or samplerOld != self.samplerMarginal:
self.resetSamples()
def resetSamples(self):
"""Reset samples."""
super().resetSamples()
self._musMUniqueCN = None
self._derMIdxs = None
self._reorderM = None
def setSamples(self, samplingEngine):
"""Copy samplingEngine and samples."""
super().setSamples(samplingEngine)
self.mus = copy(self.samplingEngine[0].mus)
for sEj in self.samplingEngine[1:]:
self.mus.append(sEj.mus)
def computeSnapshots(self):
"""Compute snapshots of solution map."""
RROMPyAssert(self._mode,
message = "Cannot start snapshot computation.")
if self.samplingEngine.nsamplesTot != self.S * self.SMarginal:
self.computeScaleFactor()
self.resetSamples()
vbMng(self, "INIT", "Starting computation of snapshots.", 5)
self.HFEngine.buildA()
self.HFEngine.buildb()
self.musPivot = self.samplerPivot.generatePoints(self.S)
self.musMarginal = self.samplerMarginal.generatePoints(
self.SMarginal)
self.mus = emptyParameterList()
self.mus.reset((self.S * self.SMarginal, self.HFEngine.npar))
self.samplingEngine.resetHistory(self.SMarginal)
for j, muMarg in enumerate(self.musMarginal):
for k in range(j * self.S, (j + 1) * self.S):
self.mus.data[k, self.directionPivot] = (
self.musPivot[k - j * self.S].data)
self.mus.data[k, self.directionMarginal] = muMarg.data
self.samplingEngine.iterSample(self.musPivot, self.musMarginal)
if self.POD:
self.samplingEngine.coalesceSamples(self.interpRcondMarginal)
else:
self.samplingEngine.coalesceSamples()
vbMng(self, "DEL", "Done computing snapshots.", 5)
def _setupMarginalInterpolationIndices(self):
"""Setup parameters for polyvander."""
RROMPyAssert(self._mode,
message = "Cannot setup interpolation indices.")
if (self._musMUniqueCN is None
or len(self._reorderM) != len(self.musMarginal)):
self._musMUniqueCN, musMIdxsTo, musMIdxs, musMCount = (
self.trainedModel.centerNormalizeMarginal(self.musMarginal)\
.unique(return_index = True, return_inverse = True,
return_counts = True))
self._musMUnique = self.musMarginal[musMIdxsTo]
self._derMIdxs = [None] * len(self._musMUniqueCN)
self._reorderM = np.empty(len(musMIdxs), dtype = int)
filled = 0
for j, cnt in enumerate(musMCount):
self._derMIdxs[j] = nextDerivativeIndices([],
self.nparMarginal, cnt)
jIdx = np.nonzero(musMIdxs == j)[0]
self._reorderM[jIdx] = np.arange(filled, filled + cnt)
filled += cnt
def _setupMarginalInterp(self):
"""Compute marginal interpolator."""
RROMPyAssert(self._mode, message = "Cannot setup numerator.")
vbMng(self, "INIT", "Starting computation of marginal interpolator.",
7)
self._setupMarginalInterpolationIndices()
if self.polydegreetypeMarginal == "TOTAL":
cfun = totalDegreeN
else:
cfun = fullDegreeN
MM = copy(self.MMarginal)
while len(self.musMarginal) < cfun(MM, self.nparMarginal): MM -= 1
if MM < self.MMarginal:
RROMPyWarning(
("MMarginal too large compared to SMarginal. "
"Reducing MMarginal by {}").format(self.MMarginal - MM))
self.MMarginal = MM
mI = []
for j in range(len(self.musMarginal)):
canonicalj = 1. * (np.arange(len(self.musMarginal)) == j)
self._MMarginal = MM
while self.MMarginal >= 0:
if self.polybasisMarginal in ppb:
p = PI()
wellCond, msg = p.setupByInterpolation(
self._musMUniqueCN, canonicalj, self.MMarginal,
self.polybasisMarginal, self.verbosity >= 5,
self.polydegreetypeMarginal == "TOTAL",
{"derIdxs": self._derMIdxs,
"reorder": self._reorderM,
"scl": np.power(self.scaleFactorMarginal, -1.)},
{"rcond": self.interpRcondMarginal})
elif self.polybasisMarginal in rbpb:
p = RBI()
wellCond, msg = p.setupByInterpolation(
self._musMUniqueCN, canonicalj, self.MMarginal,
self.polybasisMarginal,
self.radialDirectionalWeightsMarginal,
self.verbosity >= 5,
self.polydegreetypeMarginal == "TOTAL",
{"derIdxs": self._derMIdxs,
"reorder": self._reorderM,
"scl": np.power(self.scaleFactorMarginal, -1.),
"nNearestNeighbor" : self.nNearestNeighborMarginal},
{"rcond": self.interpRcondMarginal})
else:# if self.polybasisMarginal in mlspb:
p = MLSI()
wellCond, msg = p.setupByInterpolation(
self._musMUniqueCN, canonicalj, self.MMarginal,
self.polybasisMarginal,
self.radialDirectionalWeightsMarginal,
self.verbosity >= 5,
self.polydegreetypeMarginal == "TOTAL",
{"derIdxs": self._derMIdxs,
"reorder": self._reorderM,
"scl": np.power(self.scaleFactorMarginal, -1.),
"nNearestNeighbor" : self.nNearestNeighborMarginal})
vbMng(self, "MAIN", msg, 5)
if wellCond: break
RROMPyWarning(("Polyfit is poorly conditioned. Reducing "
"MMarginal by 1."))
self.MMarginal = self.MMarginal - 1
mI = mI + [copy(p)]
vbMng(self, "DEL", "Done computing marginal interpolator.", 7)
return mI
def normApprox(self, mu:paramList) -> float:
"""
Compute norm of approximant at arbitrary parameter.
Args:
mu: Target parameter.
Returns:
Target norm of approximant.
"""
if not self.POD: return super().normApprox(mu)
return np.linalg.norm(self.getApproxReduced(mu).data, axis = 0)
def computeScaleFactor(self):
"""Compute parameter rescaling factor."""
RROMPyAssert(self._mode, message = "Cannot compute rescaling factor.")
self.scaleFactorPivot = .5 * np.abs(
self.muBoundsPivot[0] ** self.rescalingExpPivot
- self.muBoundsPivot[1] ** self.rescalingExpPivot)
self.scaleFactorMarginal = .5 * np.abs(
self.muBoundsMarginal[0] ** self.rescalingExpMarginal
- self.muBoundsMarginal[1] ** self.rescalingExpMarginal)
self.scaleFactor = np.empty(self.npar)
self.scaleFactor[self.directionPivot] = self.scaleFactorPivot
self.scaleFactor[self.directionMarginal] = self.scaleFactorMarginal

Event Timeline