diff --git a/rrompy/reduction_methods/pivoted/generic_pivoted_approximant.py b/rrompy/reduction_methods/pivoted/generic_pivoted_approximant.py
index be60369..5337ae8 100644
--- a/rrompy/reduction_methods/pivoted/generic_pivoted_approximant.py
+++ b/rrompy/reduction_methods/pivoted/generic_pivoted_approximant.py
@@ -1,657 +1,657 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
import numpy as np
from copy import deepcopy as copy
from rrompy.reduction_methods.base.generic_approximant import (
GenericApproximant)
from rrompy.utilities.base.data_structures import purgeDict
from rrompy.sampling import SamplingEnginePivoted, SamplingEnginePivotedPOD
from rrompy.utilities.poly_fitting.polynomial import polybases as ppb
from rrompy.utilities.poly_fitting.radial_basis import polybases as rbpb
from rrompy.utilities.poly_fitting.piecewise_linear import sparsekinds as sk
from rrompy.utilities.base.types import paramList, ListAny
from rrompy.utilities.base import verbosityManager as vbMng
from rrompy.utilities.numerical.degree import reduceDegreeN
from rrompy.utilities.exception_manager import (RROMPyException, RROMPyAssert,
RROMPyWarning)
from rrompy.parameter import checkParameterList
__all__ = ['GenericPivotedApproximantNoMatch', 'GenericPivotedApproximant']
class GenericPivotedApproximantBase(GenericApproximant):
def __init__(self, directionPivot:ListAny, *args, **kwargs):
self._preInit()
if len(directionPivot) > 1:
raise RROMPyException(("Exactly 1 pivot parameter allowed in pole "
"matching."))
from rrompy.parameter.parameter_sampling import QuadratureSampler as QS
from rrompy.parameter.parameter_sampling import SparseGridSampler as SG
QSBase = QS([[0.], [1.]], "UNIFORM")
SGBase = SG([[0.], [1.]], "UNIFORM")
self._addParametersToList(["cutOffTolerance",
"radialDirectionalWeightsMarginal"],
[np.inf, [1.]], ["samplerPivot", "SMarginal",
"samplerMarginal"], [QSBase, [1], SGBase])
del QS, SG
self._directionPivot = directionPivot
super().__init__(*args, **kwargs)
self._postInit()
def setupSampling(self):
"""Setup sampling engine."""
RROMPyAssert(self._mode, message = "Cannot setup sampling engine.")
if not hasattr(self, "_POD") or self._POD is None: return
if self.POD:
SamplingEngine = SamplingEnginePivotedPOD
else:
SamplingEngine = SamplingEnginePivoted
self.samplingEngine = SamplingEngine(self.HFEngine,
self.directionPivot,
sample_state = self.approx_state,
verbosity = self.verbosity)
def initializeModelData(self, datadict):
if "directionPivot" in datadict.keys():
from .trained_model.trained_model_pivoted_data import (
TrainedModelPivotedData)
return (TrainedModelPivotedData(datadict["mu0"], datadict["mus"],
datadict.pop("projMat"),
datadict["scaleFactor"],
datadict.pop("rescalingExp"),
datadict["directionPivot"]),
["mu0", "scaleFactor", "directionPivot", "mus"])
else:
return super().initializeModelData(datadict)
@property
def npar(self):
"""Number of parameters."""
if hasattr(self, "_temporaryPivot"): return self.nparPivot
return super().npar
@property
def mus(self):
"""Value of mus. Its assignment may reset snapshots."""
return self._mus
@mus.setter
def mus(self, mus):
mus = checkParameterList(mus)[0]
musOld = copy(self.mus) if hasattr(self, '_mus') else None
if (musOld is None or len(mus) != len(musOld) or not mus == musOld):
self.resetSamples()
self._mus = mus
@property
def musMarginal(self):
"""Value of musMarginal. Its assignment may reset snapshots."""
return self._musMarginal
@musMarginal.setter
def musMarginal(self, musMarginal):
musMarginal = checkParameterList(musMarginal)[0]
if hasattr(self, '_musMarginal'):
musMOld = copy(self.musMarginal)
else:
musMOld = None
if (musMOld is None or len(musMarginal) != len(musMOld)
or not musMarginal == musMOld):
self.resetSamples()
self._musMarginal = musMarginal
@property
def cutOffTolerance(self):
"""Value of cutOffTolerance."""
return self._cutOffTolerance
@cutOffTolerance.setter
def cutOffTolerance(self, cutOffTolerance):
self._cutOffTolerance = cutOffTolerance
self._approxParameters["cutOffTolerance"] = self.cutOffTolerance
@property
def SMarginal(self):
"""Value of SMarginal."""
return self._SMarginal
@SMarginal.setter
def SMarginal(self, SMarginal):
if SMarginal <= 0:
raise RROMPyException("SMarginal must be positive.")
if hasattr(self, "_SMarginal") and self._SMarginal is not None:
Sold = self.SMarginal
else: Sold = -1
self._SMarginal = SMarginal
self._approxParameters["SMarginal"] = self.SMarginal
if Sold != self.SMarginal: self.resetSamples()
@property
def radialDirectionalWeightsMarginal(self):
"""Value of radialDirectionalWeightsMarginal."""
return self._radialDirectionalWeightsMarginal
@radialDirectionalWeightsMarginal.setter
def radialDirectionalWeightsMarginal(self, radialDirWeightsMarginal):
if not hasattr(radialDirWeightsMarginal, "__len__"):
radialDirWeightsMarginal = [radialDirWeightsMarginal]
self._radialDirectionalWeightsMarginal = radialDirWeightsMarginal
self._approxParameters["radialDirectionalWeightsMarginal"] = (
self.radialDirectionalWeightsMarginal)
@property
def directionPivot(self):
"""Value of directionPivot. Its assignment may reset snapshots."""
return self._directionPivot
@directionPivot.setter
def directionPivot(self, directionPivot):
if hasattr(self, '_directionPivot'):
directionPivotOld = copy(self.directionPivot)
else:
directionPivotOld = None
if (directionPivotOld is None
or len(directionPivot) != len(directionPivotOld)
or not directionPivot == directionPivotOld):
self.resetSamples()
self._directionPivot = directionPivot
@property
def directionMarginal(self):
return [x for x in range(self.HFEngine.npar) \
if x not in self.directionPivot]
@property
def nparPivot(self):
return len(self.directionPivot)
@property
def nparMarginal(self):
return self.npar - self.nparPivot
@property
def rescalingExpPivot(self):
return [self.HFEngine.rescalingExp[x] for x in self.directionPivot]
@property
def rescalingExpMarginal(self):
return [self.HFEngine.rescalingExp[x] for x in self.directionMarginal]
@property
def muBounds(self):
"""Value of muBounds."""
return self.samplerPivot.lims
@property
def muBoundsMarginal(self):
"""Value of muBoundsMarginal."""
return self.samplerMarginal.lims
@property
def sampler(self):
"""Proxy of samplerPivot."""
return self._samplerPivot
@property
def samplerPivot(self):
"""Value of samplerPivot."""
return self._samplerPivot
@samplerPivot.setter
def samplerPivot(self, samplerPivot):
if 'generatePoints' not in dir(samplerPivot):
raise RROMPyException("Pivot sampler type not recognized.")
if hasattr(self, '_samplerPivot') and self._samplerPivot is not None:
samplerOld = self.samplerPivot
self._samplerPivot = samplerPivot
self._approxParameters["samplerPivot"] = self.samplerPivot
if not 'samplerOld' in locals() or samplerOld != self.samplerPivot:
self.resetSamples()
@property
def samplerMarginal(self):
"""Value of samplerMarginal."""
return self._samplerMarginal
@samplerMarginal.setter
def samplerMarginal(self, samplerMarginal):
if 'generatePoints' not in dir(samplerMarginal):
raise RROMPyException("Marginal sampler type not recognized.")
if (hasattr(self, '_samplerMarginal')
and self._samplerMarginal is not None):
samplerOld = self.samplerMarginal
self._samplerMarginal = samplerMarginal
self._approxParameters["samplerMarginal"] = self.samplerMarginal
if not 'samplerOld' in locals() or samplerOld != self.samplerMarginal:
self.resetSamples()
def setSamples(self, samplingEngine):
"""Copy samplingEngine and samples."""
self.mus = copy(samplingEngine.mus[0])
for sEj in samplingEngine.mus[1:]:
self.mus.append(sEj)
super().setSamples(samplingEngine)
def computeScaleFactor(self):
"""Compute parameter rescaling factor."""
self.scaleFactorPivot = .5 * np.abs(
self.muBounds[0] ** self.rescalingExpPivot
- self.muBounds[1] ** self.rescalingExpPivot)
self.scaleFactorMarginal = .5 * np.abs(
self.muBoundsMarginal[0] ** self.rescalingExpMarginal
- self.muBoundsMarginal[1] ** self.rescalingExpMarginal)
self.scaleFactor = np.empty(self.npar)
self.scaleFactor[self.directionPivot] = self.scaleFactorPivot
self.scaleFactor[self.directionMarginal] = self.scaleFactorMarginal
def normApprox(self, mu:paramList) -> float:
_PODOld = self.POD
self._POD = False
result = super().normApprox(mu)
self._POD = _PODOld
return result
def loadTrainedModel(self, filename:str):
"""Load trained reduced model from file."""
super().loadTrainedModel(filename)
self._musMarginal = self.trainedModel.data.musMarginal
class GenericPivotedApproximantNoMatch(GenericPivotedApproximantBase):
"""
ROM pivoted approximant (without pole matching) computation for parametric
problems (ABSTRACT).
Args:
HFEngine: HF problem solver.
mu0(optional): Default parameter. Defaults to 0.
directionPivot(optional): Pivot components. Defaults to [0].
approxParameters(optional): Dictionary containing values for main
parameters of approximant. Recognized keys are:
- 'POD': whether to compute POD of snapshots; defaults to True;
- 'scaleFactorDer': scaling factors for derivative computation;
defaults to 'AUTO';
- 'cutOffTolerance': tolerance for ignoring parasitic poles;
defaults to np.inf;
- 'S': total number of pivot samples current approximant relies
upon;
- 'samplerPivot': pivot sample point generator;
- 'SMarginal': total number of marginal samples current approximant
relies upon;
- 'samplerMarginal': marginal sample point generator;
- 'radialDirectionalWeightsMarginal': radial basis weights for
marginal interpolant; defaults to 1.
Defaults to empty dict.
approx_state(optional): Whether to approximate state. Defaults to
False.
verbosity(optional): Verbosity level. Defaults to 10.
Attributes:
HFEngine: HF problem solver.
mu0: Default parameter.
directionPivot: Pivot components.
mus: Array of snapshot parameters.
musMarginal: Array of marginal snapshot parameters.
approxParameters: Dictionary containing values for main parameters of
approximant. Recognized keys are in parameterList.
parameterListSoft: Recognized keys of soft approximant parameters:
- 'POD': whether to compute POD of snapshots;
- 'scaleFactorDer': scaling factors for derivative computation;
- 'cutOffTolerance': tolerance for ignoring parasitic poles;
- 'radialDirectionalWeightsMarginal': radial basis weights for
marginal interpolant.
parameterListCritical: Recognized keys of critical approximant
parameters:
- 'S': total number of pivot samples current approximant relies
upon;
- 'samplerPivot': pivot sample point generator;
- 'SMarginal': total number of marginal samples current approximant
relies upon;
- 'samplerMarginal': marginal sample point generator.
approx_state: Whether to approximate state.
verbosity: Verbosity level.
POD: Whether to compute POD of snapshots.
scaleFactorDer: Scaling factors for derivative computation.
cutOffTolerance: Tolerance for ignoring parasitic poles.
S: Total number of pivot samples current approximant relies upon.
samplerPivot: Pivot sample point generator.
SMarginal: Total number of marginal samples current approximant relies
upon.
samplerMarginal: Marginal sample point generator.
radialDirectionalWeightsMarginal: Radial basis weights for marginal
interpolant.
muBounds: list of bounds for pivot parameter values.
muBoundsMarginal: list of bounds for marginal parameter values.
samplingEngine: Sampling engine.
uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as
sampleList.
lastSolvedHF: Parameter(s) corresponding to last computed high fidelity
solution(s) as parameterList.
uApproxReduced: Reduced approximate solution(s) with parameter(s)
lastSolvedApprox as sampleList.
lastSolvedApproxReduced: Parameter(s) corresponding to last computed
reduced approximate solution(s) as parameterList.
uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as
sampleList.
lastSolvedApprox: Parameter(s) corresponding to last computed
approximate solution(s) as parameterList.
"""
@property
def tModelType(self):
from .trained_model.trained_model_pivoted_rational_nomatch import (
TrainedModelPivotedRationalNoMatch)
return TrainedModelPivotedRationalNoMatch
def _finalizeMarginalization(self):
vbMng(self, "INIT", "Recompressing by cut off.", 10)
msg = self.trainedModel.recompressByCutOff(self.cutOffTolerance,
self.samplerPivot.normalFoci(),
self.samplerPivot.groundPotential())
vbMng(self, "DEL", "Done recompressing." + msg, 10)
self.trainedModel.setupMarginalInterp(
self.radialDirectionalWeightsMarginal)
self.trainedModel.data.approxParameters = copy(self.approxParameters)
class GenericPivotedApproximant(GenericPivotedApproximantBase):
"""
ROM pivoted approximant (with pole matching) computation for parametric
problems (ABSTRACT).
Args:
HFEngine: HF problem solver.
mu0(optional): Default parameter. Defaults to 0.
directionPivot(optional): Pivot components. Defaults to [0].
approxParameters(optional): Dictionary containing values for main
parameters of approximant. Recognized keys are:
- 'POD': whether to compute POD of snapshots; defaults to True;
- 'scaleFactorDer': scaling factors for derivative computation;
defaults to 'AUTO';
- 'matchingWeight': weight for pole matching optimization; defaults
to 1;
- 'cutOffTolerance': tolerance for ignoring parasitic poles;
defaults to np.inf;
- 'cutOffSharedRatio': required ratio of marginal points to share
resonance in cut off strategy; defaults to 1.;
- 'S': total number of pivot samples current approximant relies
upon;
- 'samplerPivot': pivot sample point generator;
- 'SMarginal': total number of marginal samples current approximant
relies upon;
- 'samplerMarginal': marginal sample point generator;
- 'polybasisMarginal': type of polynomial basis for marginal
interpolation; allowed values include 'MONOMIAL_*',
'CHEBYSHEV_*', 'LEGENDRE_*', 'NEARESTNEIGHBOR', and
'PIECEWISE_LINEAR_*'; defaults to 'MONOMIAL';
- 'paramsMarginal': dictionary of parameters for marginal
interpolation; include:
. 'MMarginal': degree of marginal interpolant; defaults to
'AUTO', i.e. maximum allowed; not for 'NEARESTNEIGHBOR' or
'PIECEWISE_LINEAR_*';
. 'nNeighborsMarginal': number of marginal nearest neighbors;
defaults to 1; only for 'NEARESTNEIGHBOR' or
'PIECEWISE_LINEAR_*';
. 'polydegreetypeMarginal': type of polynomial degree for
marginal; defaults to 'TOTAL'; not for 'NEARESTNEIGHBOR' or
'PIECEWISE_LINEAR_*';
. 'interpRcondMarginal': tolerance for marginal interpolation;
defaults to None; not for 'NEARESTNEIGHBOR'.
- 'radialDirectionalWeightsMarginal': radial basis weights for
marginal interpolant; defaults to 1.
Defaults to empty dict.
approx_state(optional): Whether to approximate state. Defaults to
False.
verbosity(optional): Verbosity level. Defaults to 10.
Attributes:
HFEngine: HF problem solver.
mu0: Default parameter.
directionPivot: Pivot components.
mus: Array of snapshot parameters.
musMarginal: Array of marginal snapshot parameters.
approxParameters: Dictionary containing values for main parameters of
approximant. Recognized keys are in parameterList.
parameterListSoft: Recognized keys of soft approximant parameters:
- 'POD': whether to compute POD of snapshots;
- 'scaleFactorDer': scaling factors for derivative computation;
- 'matchingWeight': weight for pole matching optimization;
- 'cutOffTolerance': tolerance for ignoring parasitic poles;
- 'cutOffSharedRatio': required ratio of marginal points to share
resonance in cut off strategy;
- 'polybasisMarginal': type of polynomial basis for marginal
interpolation;
- 'paramsMarginal': dictionary of parameters for marginal
interpolation; include:
. 'MMarginal': degree of marginal interpolant;
. 'nNeighborsMarginal': number of marginal nearest neighbors;
. 'polydegreetypeMarginal': type of polynomial degree for
marginal;
. 'interpRcondMarginal': tolerance for marginal interpolation.
- 'radialDirectionalWeightsMarginal': radial basis weights for
marginal interpolant.
parameterListCritical: Recognized keys of critical approximant
parameters:
- 'S': total number of pivot samples current approximant relies
upon;
- 'samplerPivot': pivot sample point generator;
- 'SMarginal': total number of marginal samples current approximant
relies upon;
- 'samplerMarginal': marginal sample point generator.
approx_state: Whether to approximate state.
verbosity: Verbosity level.
POD: Whether to compute POD of snapshots.
scaleFactorDer: Scaling factors for derivative computation.
matchingWeight: Weight for pole matching optimization.
cutOffTolerance: Tolerance for ignoring parasitic poles.
cutOffSharedRatio: Required ratio of marginal points to share resonance
in cut off strategy.
S: Total number of pivot samples current approximant relies upon.
samplerPivot: Pivot sample point generator.
SMarginal: Total number of marginal samples current approximant relies
upon.
samplerMarginal: Marginal sample point generator.
polybasisMarginal: Type of polynomial basis for marginal interpolation.
paramsMarginal: Dictionary of parameters for marginal interpolation.
radialDirectionalWeightsMarginal: Radial basis weights for marginal
interpolant.
muBounds: list of bounds for pivot parameter values.
muBoundsMarginal: list of bounds for marginal parameter values.
samplingEngine: Sampling engine.
uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as
sampleList.
lastSolvedHF: Parameter(s) corresponding to last computed high fidelity
solution(s) as parameterList.
uApproxReduced: Reduced approximate solution(s) with parameter(s)
lastSolvedApprox as sampleList.
lastSolvedApproxReduced: Parameter(s) corresponding to last computed
reduced approximate solution(s) as parameterList.
uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as
sampleList.
lastSolvedApprox: Parameter(s) corresponding to last computed
approximate solution(s) as parameterList.
"""
def __init__(self, *args, **kwargs):
self._preInit()
self._addParametersToList(["matchingWeight", "cutOffSharedRatio",
"polybasisMarginal", "paramsMarginal"],
[1., 1., "MONOMIAL", {}])
self.parameterMarginalList = ["MMarginal", "nNeighborsMarginal",
"polydegreetypeMarginal",
"interpRcondMarginal"]
super().__init__(*args, **kwargs)
self._postInit()
@property
def tModelType(self):
from .trained_model.trained_model_pivoted_rational import (
TrainedModelPivotedRational)
return TrainedModelPivotedRational
@property
def matchingWeight(self):
"""Value of matchingWeight."""
return self._matchingWeight
@matchingWeight.setter
def matchingWeight(self, matchingWeight):
self._matchingWeight = matchingWeight
self._approxParameters["matchingWeight"] = self.matchingWeight
@property
def cutOffSharedRatio(self):
"""Value of cutOffSharedRatio."""
return self._cutOffSharedRatio
@cutOffSharedRatio.setter
def cutOffSharedRatio(self, cutOffSharedRatio):
if cutOffSharedRatio > 1.:
RROMPyWarning("Cut off shared ratio too large. Clipping to 1.")
cutOffSharedRatio = 1.
elif cutOffSharedRatio < 0.:
RROMPyWarning("Cut off shared ratio too small. Clipping to 0.")
cutOffSharedRatio = 0.
self._cutOffSharedRatio = cutOffSharedRatio
self._approxParameters["cutOffSharedRatio"] = self.cutOffSharedRatio
@property
def polybasisMarginal(self):
"""Value of polybasisMarginal."""
return self._polybasisMarginal
@polybasisMarginal.setter
def polybasisMarginal(self, polybasisMarginal):
try:
polybasisMarginal = polybasisMarginal.upper().strip().replace(" ",
"")
if polybasisMarginal not in ppb + rbpb + ["NEARESTNEIGHBOR"] + sk:
raise RROMPyException(
"Prescribed marginal polybasis not recognized.")
self._polybasisMarginal = polybasisMarginal
except:
RROMPyWarning(("Prescribed marginal polybasis not recognized. "
"Overriding to 'MONOMIAL'."))
self._polybasisMarginal = "MONOMIAL"
self._approxParameters["polybasisMarginal"] = self.polybasisMarginal
@property
def paramsMarginal(self):
"""Value of paramsMarginal."""
return self._paramsMarginal
@paramsMarginal.setter
def paramsMarginal(self, paramsMarginal):
paramsMarginal = purgeDict(paramsMarginal, self.parameterMarginalList,
dictname = self.name() + ".paramsMarginal",
baselevel = 1)
keyList = list(paramsMarginal.keys())
if not hasattr(self, "_paramsMarginal"): self._paramsMarginal = {}
if "MMarginal" in keyList:
MMarg = paramsMarginal["MMarginal"]
elif ("MMarginal" in self.paramsMarginal
and not hasattr(self, "_MMarginal_isauto")):
MMarg = self.paramsMarginal["MMarginal"]
else:
MMarg = "AUTO"
if isinstance(MMarg, str):
MMarg = MMarg.strip().replace(" ","")
if "-" not in MMarg: MMarg = MMarg + "-0"
self._MMarginal_isauto = True
self._MMarginal_shift = int(MMarg.split("-")[-1])
MMarg = 0
if MMarg < 0:
raise RROMPyException("MMarginal must be non-negative.")
self._paramsMarginal["MMarginal"] = MMarg
if "nNeighborsMarginal" in keyList:
self._paramsMarginal["nNeighborsMarginal"] = max(1,
paramsMarginal["nNeighborsMarginal"])
elif "nNeighborsMarginal" not in self.paramsMarginal:
self._paramsMarginal["nNeighborsMarginal"] = 1
if "polydegreetypeMarginal" in keyList:
try:
polydegtypeM = paramsMarginal["polydegreetypeMarginal"]\
.upper().strip().replace(" ","")
if polydegtypeM not in ["TOTAL", "FULL"]:
raise RROMPyException(("Prescribed polydegreetypeMarginal "
"not recognized."))
self._paramsMarginal["polydegreetypeMarginal"] = polydegtypeM
except:
RROMPyWarning(("Prescribed polydegreetypeMarginal not "
"recognized. Overriding to 'TOTAL'."))
self._paramsMarginal["polydegreetypeMarginal"] = "TOTAL"
elif "polydegreetypeMarginal" not in self.paramsMarginal:
self._paramsMarginal["polydegreetypeMarginal"] = "TOTAL"
if "interpRcondMarginal" in keyList:
self._paramsMarginal["interpRcondMarginal"] = (
paramsMarginal["interpRcondMarginal"])
elif "interpRcondMarginal" not in self.paramsMarginal:
self._paramsMarginal["interpRcondMarginal"] = -1
self._approxParameters["paramsMarginal"] = self.paramsMarginal
def _setMMarginalAuto(self):
if (self.polybasisMarginal not in ppb + rbpb
or "MMarginal" not in self.paramsMarginal
or "polydegreetypeMarginal" not in self.paramsMarginal):
raise RROMPyException(("Cannot set MMarginal if "
"polybasisMarginal does not allow it."))
self.paramsMarginal["MMarginal"] = max(0, reduceDegreeN(
len(self.musMarginal), len(self.musMarginal),
self.nparMarginal,
self.paramsMarginal["polydegreetypeMarginal"])
- self._MMarginal_shift)
vbMng(self, "MAIN", ("Automatically setting MMarginal to {}.").format(
self.paramsMarginal["MMarginal"]), 25)
def purgeparamsMarginal(self):
self.paramsMarginal = {}
paramsMbadkeys = []
if self.polybasisMarginal in ppb + rbpb + sk:
paramsMbadkeys += ["nNeighborsMarginal"]
if self.polybasisMarginal in ["NEARESTNEIGHBOR"] + sk:
paramsMbadkeys += ["MMarginal", "polydegreetypeMarginal"]
if hasattr(self, "_MMarginal_isauto"): del self._MMarginal_isauto
if hasattr(self, "_MMarginal_shift"): del self._MMarginal_shift
if self.polybasisMarginal == "NEARESTNEIGHBOR":
paramsMbadkeys += ["interpRcondMarginal"]
for key in paramsMbadkeys:
if key in self._paramsMarginal: del self._paramsMarginal[key]
self._approxParameters["paramsMarginal"] = self.paramsMarginal
def _finalizeMarginalization(self):
vbMng(self, "INIT", "Recompressing by cut off.", 10)
msg = self.trainedModel.recompressByCutOff(self.cutOffTolerance,
self.cutOffSharedRatio,
self.samplerPivot.normalFoci(),
self.samplerPivot.groundPotential())
vbMng(self, "DEL", "Done recompressing." + msg, 10)
if self.polybasisMarginal == "NEARESTNEIGHBOR":
interpPars = [self.paramsMarginal["nNeighborsMarginal"]]
else:
interpPars = [{"rcond":self.paramsMarginal["interpRcondMarginal"]}]
if self.polybasisMarginal in ppb + rbpb:
interpPars = [self.verbosity >= 5,
self.paramsMarginal["polydegreetypeMarginal"] == "TOTAL",
{}] + interpPars
+ extraPar = hasattr(self, "_reduceDegreeNNoWarn")
if self.polybasisMarginal in ppb:
rDWMEff = None
- elif self.polybasisMarginal in sk:
- idxEff = [x for x in range(self.samplerMarginal.npoints)
- if not hasattr(self.trainedModel, "_idxExcl")
- or x not in self.trainedModel._idxExcl]
- rDWMEff = self.samplerMarginal.depth[idxEff]
- else: #if self.polybasisMarginal in rbpb + ["NEARESTNEIGHBOR"]:
+ else: #if self.polybasisMarginal in rbpb + ["NEARESTNEIGHBOR"] + sk:
self.computeScaleFactor()
rDWMEff = [w * f for w, f in zip(
self.radialDirectionalWeightsMarginal,
self.scaleFactorMarginal)]
+ if self.polybasisMarginal in sk:
+ idxEff = [x for x in range(self.samplerMarginal.npoints)
+ if not hasattr(self.trainedModel, "_idxExcl")
+ or x not in self.trainedModel._idxExcl]
+ extraPar = self.samplerMarginal.depth[idxEff]
self.trainedModel.setupMarginalInterp(self, interpPars,
- hasattr(self, "_MMarginal_isauto"),
- rDWMEff,
- hasattr(self, "_reduceDegreeNNoWarn"))
+ hasattr(self, "_MMarginal_isauto"),
+ rDWMEff, extraPar)
self.trainedModel.data.approxParameters = copy(self.approxParameters)
diff --git a/rrompy/reduction_methods/pivoted/trained_model/trained_model_pivoted_rational.py b/rrompy/reduction_methods/pivoted/trained_model/trained_model_pivoted_rational.py
index b5a366b..979dab3 100644
--- a/rrompy/reduction_methods/pivoted/trained_model/trained_model_pivoted_rational.py
+++ b/rrompy/reduction_methods/pivoted/trained_model/trained_model_pivoted_rational.py
@@ -1,314 +1,310 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
import numpy as np
from copy import deepcopy as copy
from .trained_model_pivoted_rational_nomatch import (
TrainedModelPivotedRationalNoMatch)
from rrompy.utilities.base.types import (Np1D, Np2D, List, ListAny, paramVal,
paramList, HFEng)
from rrompy.utilities.base import verbosityManager as vbMng
from rrompy.utilities.numerical.point_matching import (pointMatching,
chordalMetricAdjusted, potential)
from rrompy.utilities.numerical.degree import reduceDegreeN
from rrompy.utilities.poly_fitting.polynomial import (polybases as ppb,
PolynomialInterpolator as PI)
from rrompy.utilities.poly_fitting.radial_basis import (polybases as rbpb,
RadialBasisInterpolator as RBI)
from rrompy.utilities.poly_fitting.heaviside import (heavisideUniformShape,
rational2heaviside,
HeavisideInterpolator as HI)
from rrompy.utilities.poly_fitting.nearest_neighbor import (
NearestNeighborInterpolator as NNI)
from rrompy.utilities.poly_fitting.piecewise_linear import (
PiecewiseLinearInterpolator as PLI)
from rrompy.utilities.exception_manager import (RROMPyException, RROMPyAssert,
RROMPyWarning)
from rrompy.parameter import checkParameterList
__all__ = ['TrainedModelPivotedRational']
class TrainedModelPivotedRational(TrainedModelPivotedRationalNoMatch):
"""
ROM approximant evaluation for pivoted approximants based on interpolation
of rational approximants (with pole matching).
Attributes:
Data: dictionary with all that can be pickled.
"""
def centerNormalizeMarginal(self, mu : paramList = [],
mu0 : paramVal = None) -> paramList:
"""
Compute normalized parameter to be plugged into approximant.
Args:
mu: Parameter(s) 1.
mu0: Parameter(s) 2. If None, set to self.data.mu0Marginal.
Returns:
Normalized parameter.
"""
mu = checkParameterList(mu, self.data.nparMarginal)[0]
if mu0 is None: mu0 = self.data.mu0Marginal
rad = ((mu ** self.data.rescalingExpMarginal
- mu0 ** self.data.rescalingExpMarginal)
/ self.data.scaleFactorMarginal)
return rad
def setupMarginalInterp(self, approx, interpPars:ListAny,
MMAuto:bool, rDWM : Np1D = None,
- noWarnReduceAuto : bool = True):
+ extraPar = True):
vbMng(self, "INIT", "Starting computation of marginal interpolator.",
12)
musMCN = self.centerNormalizeMarginal(self.data.musMarginal)
+ nM = len(musMCN)
pbM = approx.polybasisMarginal
- self.data.marginalInterp = []
for ipts, pts in enumerate(self.data.suppEffPts):
if pbM in ppb:
p = PI()
elif pbM in rbpb:
p = RBI()
- elif pbM == "NEARESTNEIGHBOR":
- p = NNI()
- else: #if pbM in sparsekinds:
- pllims = [[-1.] * self.data.nparMarginal,
- [1.] * self.data.nparMarginal]
- p = PLI()
+ else: # #if pbM in sparsekinds + ["NEARESTNEIGHBOR"]:
+ if ipts > 0 or pbM == "NEARESTNEIGHBOR":
+ p = NNI()
+ else: #if ipts == 0 or pbM in sparsekinds:
+ pllims = [[-1.] * self.data.nparMarginal,
+ [1.] * self.data.nparMarginal]
+ p = PLI()
if len(pts) == 0:
- musMCNEff, valsEff = musMCN[[0]], np.ones((1, 0))
- if pbM in ppb + rbpb:
- p.setupByInterpolation(musMCNEff, valsEff, 0, pbM)
- elif pbM == "NEARESTNEIGHBOR":
- p.setupByInterpolation(musMCNEff, valsEff, 1, rDWM)
- else: #if pbM in sparsekinds:
- p.setupByInterpolation(musMCNEff, valsEff, pllims,
- rDWM[[0]], pbM)
- else:
- musMCNEff, valsEff = musMCN[pts], np.eye(len(pts))
- if pbM in ppb + rbpb:
- if MMAuto:
- if ipts > 0:
- verb = approx.verbosity
- approx.verbosity = 0
- _musM = approx.musMarginal
- approx.musMarginal = musMCNEff
- approx._setMMarginalAuto()
- if ipts > 0:
- approx.musMarginal = _musM
- approx.verbosity = verb
- if ipts == 0:
- _MMarginalEff = approx.paramsMarginal["MMarginal"]
- if not MMAuto:
- approx.paramsMarginal["MMarginal"] = _MMarginalEff
- MM = reduceDegreeN(approx.paramsMarginal["MMarginal"],
- len(musMCNEff), self.data.nparMarginal,
- approx.paramsMarginal["polydegreetypeMarginal"])
- if MM < approx.paramsMarginal["MMarginal"]:
- if ipts == 0 and not noWarnReduceAuto:
- RROMPyWarning(
- ("MMarginal too large compared to SMarginal. Reducing "
- "MMarginal by {}").format(
- approx.paramsMarginal["MMarginal"] - MM))
- approx.paramsMarginal["MMarginal"] = MM
- MMEff = approx.paramsMarginal["MMarginal"]
- while MMEff >= 0:
- pParRest = copy(interpPars)
- if pbM in rbpb:
- pParRest = [rDWM] + pParRest
- wellCond, msg = p.setupByInterpolation(musMCNEff,
- valsEff, MMEff,
- pbM, *pParRest)
- vbMng(self, "MAIN", msg, 30)
- if wellCond: break
- vbMng(self, "MAIN",
- ("Polyfit is poorly conditioned. Reducing "
- "MMarginal by 1."), 35)
- MMEff -= 1
- if MMEff < 0:
- raise RROMPyException(("Instability in computation of "
- "interpolant. Aborting."))
- elif pbM == "NEARESTNEIGHBOR":
- p.setupByInterpolation(musMCNEff, valsEff, *interpPars,
- rDWM)
- else: #if pbM in sparsekinds:
- wellCond, msg = p.setupByInterpolation(musMCNEff, valsEff,
- pllims, rDWM[pts],
- pbM, *interpPars)
+ raise RROMPyException("Empty list of support points.")
+ musMCNEff, valsEff = musMCN[pts], np.eye(len(pts))
+ if pbM in ppb + rbpb:
+ if MMAuto:
+ if ipts > 0:
+ verb = approx.verbosity
+ approx.verbosity = 0
+ _musM = approx.musMarginal
+ approx.musMarginal = musMCNEff
+ approx._setMMarginalAuto()
+ if ipts > 0:
+ approx.musMarginal = _musM
+ approx.verbosity = verb
+ if ipts == 0:
+ _MMarginalEff = approx.paramsMarginal["MMarginal"]
+ if not MMAuto:
+ approx.paramsMarginal["MMarginal"] = _MMarginalEff
+ MM = reduceDegreeN(approx.paramsMarginal["MMarginal"],
+ len(musMCNEff), self.data.nparMarginal,
+ approx.paramsMarginal["polydegreetypeMarginal"])
+ if MM < approx.paramsMarginal["MMarginal"]:
+ if ipts == 0 and not extraPar:
+ RROMPyWarning(
+ ("MMarginal too large compared to SMarginal. Reducing "
+ "MMarginal by {}").format(
+ approx.paramsMarginal["MMarginal"] - MM))
+ approx.paramsMarginal["MMarginal"] = MM
+ MMEff = approx.paramsMarginal["MMarginal"]
+ while MMEff >= 0:
+ pParRest = copy(interpPars)
+ if pbM in rbpb:
+ pParRest = [rDWM] + pParRest
+ wellCond, msg = p.setupByInterpolation(musMCNEff,
+ valsEff, MMEff,
+ pbM, *pParRest)
vbMng(self, "MAIN", msg, 30)
- if not wellCond:
- vbMng(self, "MAIN",
- "Warning: polyfit is poorly conditioned.", 35)
- self.data.marginalInterp += [copy(p)]
+ if wellCond: break
+ vbMng(self, "MAIN",
+ ("Polyfit is poorly conditioned. Reducing "
+ "MMarginal by 1."), 35)
+ MMEff -= 1
+ if MMEff < 0:
+ raise RROMPyException(("Instability in computation of "
+ "interpolant. Aborting."))
+ elif ipts > 0 or pbM == "NEARESTNEIGHBOR":
+ nNeigh = interpPars[0] if ipts == 0 else 1
+ p.setupByInterpolation(musMCNEff, valsEff, nNeigh, rDWM)
+ else: #if ipts == 0 and pbM in sparsekinds:
+ wellCond, msg = p.setupByInterpolation(musMCNEff, valsEff,
+ pllims, extraPar[pts],
+ pbM, *interpPars)
+ vbMng(self, "MAIN", msg, 30)
+ if not wellCond:
+ vbMng(self, "MAIN",
+ "Warning: polyfit is poorly conditioned.", 35)
+ if ipts == 0:
+ self.data.marginalInterp = copy(p)
+ self.data.polesEff = [copy(hi.poles) for hi in self.data.HIs]
+ self.data.coeffsEff = [copy(hi.coeffs) for hi in self.data.HIs]
+ else:
+ ptsBad = [i for i in range(nM) if i not in pts]
+ musMCNBad = musMCN[ptsBad]
+ idxBad = np.where(self.data.suppEffIdx == ipts)[0]
+ valMuMBad = p(musMCNBad)
+ pls, cfs = 0., 0.
+ for ij, j in enumerate(pts):
+ pls = pls + np.expand_dims(self.data.polesEff[j][idxBad],
+ - 1) * valMuMBad[ij]
+ cfs = cfs + np.expand_dims(self.data.coeffsEff[j][idxBad],
+ - 1) * valMuMBad[ij]
+ for ij, j in enumerate(ptsBad):
+ self.data.polesEff[j][idxBad] = pls[..., ij]
+ self.data.coeffsEff[j][idxBad] = cfs[..., ij]
if pbM in ppb + rbpb:
approx.paramsMarginal["MMarginal"] = _MMarginalEff
vbMng(self, "DEL", "Done computing marginal interpolator.", 12)
def initializeFromLists(self, poles:ListAny, coeffs:ListAny, basis:str,
HFEngine:HFEng, matchingWeight : float = 1.,
is_state : bool = True):
"""Initialize Heaviside representation."""
musM = self.data.musMarginal
margAbsDist = np.sum(np.abs(np.repeat(musM.data, len(musM), 0)
- np.tile(musM.data, [len(musM), 1])
), axis = 1).reshape(len(musM), len(musM))
explored = [0]
unexplored = list(range(1, len(musM)))
poles, coeffs = heavisideUniformShape(poles, coeffs)
N = len(poles[0])
for _ in range(1, len(musM)):
minIdx = np.argmin(np.concatenate([margAbsDist[ex, unexplored] \
for ex in explored]))
minIex = explored[minIdx // len(unexplored)]
minIunex = unexplored[minIdx % len(unexplored)]
resex = coeffs[minIex][: N].T
resunex = coeffs[minIunex][: N].T
if matchingWeight != 0 and not self.data._collapsed:
resex = self.data.projMat[:, : resex.shape[0]].dot(resex)
resunex = self.data.projMat[:, : resunex.shape[0]].dot(resunex)
dist = chordalMetricAdjusted(poles[minIex], poles[minIunex],
matchingWeight, resex, resunex,
HFEngine, is_state)
reordering = pointMatching(dist)[1]
poles[minIunex] = poles[minIunex][reordering]
coeffs[minIunex][: N] = coeffs[minIunex][reordering]
explored += [minIunex]
unexplored.remove(minIunex)
super().initializeFromLists(poles, coeffs, basis)
self.data.suppEffPts = [np.arange(len(self.data.HIs))]
self.data.suppEffIdx = np.zeros(N, dtype = int)
def initializeFromRational(self, HFEngine:HFEng,
matchingWeight : float = 1.,
is_state : bool = True):
"""Initialize Heaviside representation."""
RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters")
poles, coeffs = [], []
for Q, P in zip(self.data.Qs, self.data.Ps):
cfs, pls, basis = rational2heaviside(P, Q)
poles += [pls]
coeffs += [cfs]
self.initializeFromLists(poles, coeffs, basis, HFEngine,
matchingWeight, is_state)
def recompressByCutOff(self, tol:float, shared:float,
foci:List[np.complex], ground:float) -> str:
N = len(self.data.HIs[0].poles)
M = len(self.data.HIs)
mu0 = np.mean(foci)
goodLocPoles = np.array([potential(hi.poles, foci) - ground
<= tol * ground for hi in self.data.HIs])
self.data.suppEffPts = [np.arange(len(self.data.HIs))]
self.data.suppEffIdx = np.zeros(N, dtype = int)
if np.all(np.all(goodLocPoles)): return " No poles erased."
goodGlobPoles = np.sum(goodLocPoles, axis = 0)
goodEnoughPoles = goodGlobPoles >= max(1., 1. * shared * M)
keepPole = np.where(goodEnoughPoles)[0]
halfPole = np.where(np.logical_and(goodEnoughPoles,
goodGlobPoles < M))[0]
removePole = np.where(np.logical_not(goodEnoughPoles))[0]
if len(removePole) > 0:
keepCoeff = np.append(keepPole, np.append([N],
np.arange(N + 1, len(self.data.HIs[0].coeffs))))
for hi in self.data.HIs:
polyCorrection = np.zeros_like(hi.coeffs[0, :])
for j in removePole:
if not np.isinf(hi.poles[j]):
polyCorrection += hi.coeffs[j, :] / (mu0 - hi.poles[j])
if len(hi.coeffs) == N:
hi.coeffs = np.vstack((hi.coeffs, polyCorrection))
else:
hi.coeffs[N, :] += polyCorrection
hi.poles = hi.poles[keepPole]
hi.coeffs = hi.coeffs[keepCoeff, :]
for idxR in halfPole:
pts = np.where(goodLocPoles[:, idxR])[0]
idxEff = len(self.data.suppEffPts)
for idEff, prevPts in enumerate(self.data.suppEffPts):
if len(prevPts) == len(pts):
if np.allclose(prevPts, pts):
idxEff = idEff
break
if idxEff == len(self.data.suppEffPts):
self.data.suppEffPts += [pts]
self.data.suppEffIdx[idxR] = idxEff
self.data.suppEffIdx = self.data.suppEffIdx[keepPole]
return (" Hard-erased {} pole".format(len(removePole))
+ "s" * (len(removePole) != 1)
+ " and soft-erased {} pole".format(len(halfPole))
+ "s" * (len(halfPole) != 1) + ".")
- def _interpolateMarginal(self, mu:paramList, objs:ListAny,
- mIvals : List[Np2D] = None) -> Np2D:
- res = np.zeros(objs[0].shape + (len(mu),), dtype = objs[0].dtype)
- if mIvals is None: muC = self.centerNormalizeMarginal(mu)
- for suppIdx, suppEffPt in enumerate(self.data.suppEffPts):
- i = np.where(self.data.suppEffIdx == suppIdx)[0]
- if suppIdx == 0:
- i = np.append(i, np.arange(len(self.data.suppEffIdx),
- len(res)))
- if len(i) > 0:
- if mIvals is None:
- mIval = self.data.marginalInterp[suppIdx](muC)
- else:
- mIval = mIvals[suppIdx]
- for ij, j in enumerate(suppEffPt):
- res[i] += np.expand_dims(objs[j][i], - 1) * mIval[ij]
- return res
-
def interpolateMarginalInterpolator(self, mu : paramList = []):
"""Obtain interpolated approximant interpolator."""
mu = checkParameterList(mu, self.data.nparMarginal)[0]
muC = self.centerNormalizeMarginal(mu)
- mIvals = [None] * len(self.data.suppEffPts)
- for suppIdx in range(len(self.data.suppEffPts)):
- mIvals[suppIdx] = self.data.marginalInterp[suppIdx](muC)
+ mIvals = self.data.marginalInterp(muC)
poless = self.interpolateMarginalPoles(mu, mIvals)
coeffss = self.interpolateMarginalCoeffs(mu, mIvals)
his = [None] * len(mu)
for j in range(len(mu)):
his[j] = HI()
his[j].poles = poless[..., j]
his[j].coeffs = coeffss[..., j]
his[j].npar = 1
his[j].polybasis = self.data.HIs[0].polybasis
return his
def interpolateMarginalPoles(self, mu : paramList = [],
mIvals : Np2D = None) -> Np1D:
"""Obtain interpolated approximant poles."""
mu = checkParameterList(mu, self.data.nparMarginal)[0]
vbMng(self, "INIT",
"Interpolating marginal poles at mu = {}.".format(mu), 95)
- intMPoles = self._interpolateMarginal(mu,
- [hi.poles for hi in self.data.HIs],
- mIvals)
+ intMPoles = np.zeros(self.data.polesEff[0].shape + (len(mu),),
+ dtype = self.data.polesEff[0].dtype)
+ if mIvals is None:
+ mIvals = self.data.marginalInterp(self.centerNormalizeMarginal(mu))
+ for pEff, mI in zip(self.data.polesEff, mIvals):
+ intMPoles += np.expand_dims(pEff, - 1) * mI
vbMng(self, "DEL", "Done interpolating marginal poles.", 95)
return intMPoles
def interpolateMarginalCoeffs(self, mu : paramList = [],
mIvals : Np2D = None) -> Np1D:
"""Obtain interpolated approximant coefficients."""
mu = checkParameterList(mu, self.data.nparMarginal)[0]
vbMng(self, "INIT",
"Interpolating marginal coefficients at mu = {}.".format(mu), 95)
- intMCoeffs = self._interpolateMarginal(mu,
- [hi.coeffs for hi in self.data.HIs],
- mIvals)
+ intMCoeffs = np.zeros(self.data.coeffsEff[0].shape + (len(mu),),
+ dtype = self.data.coeffsEff[0].dtype)
+ if mIvals is None:
+ mIvals = self.data.marginalInterp(self.centerNormalizeMarginal(mu))
+ for cEff, mI in zip(self.data.coeffsEff, mIvals):
+ intMCoeffs += np.expand_dims(cEff, - 1) * mI
vbMng(self, "DEL", "Done interpolating marginal coefficients.", 95)
return intMCoeffs
diff --git a/rrompy/reduction_methods/pivoted/trained_model/trained_model_pivoted_rational_nomatch.py b/rrompy/reduction_methods/pivoted/trained_model/trained_model_pivoted_rational_nomatch.py
index c4f1b66..05b9e77 100644
--- a/rrompy/reduction_methods/pivoted/trained_model/trained_model_pivoted_rational_nomatch.py
+++ b/rrompy/reduction_methods/pivoted/trained_model/trained_model_pivoted_rational_nomatch.py
@@ -1,320 +1,324 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
import numpy as np
from scipy.special import factorial as fact
from itertools import combinations
from rrompy.reduction_methods.standard.trained_model.trained_model_rational \
import TrainedModelRational
from rrompy.utilities.base.types import (Np1D, List, ListAny, paramVal,
paramList, sampList)
from rrompy.utilities.base import verbosityManager as vbMng, freepar as fp
+from rrompy.utilities.numerical import dot
from rrompy.utilities.numerical.compress_matrix import compressMatrix
from rrompy.utilities.numerical.point_matching import potential
from rrompy.utilities.poly_fitting.heaviside import (rational2heaviside,
HeavisideInterpolator as HI)
from rrompy.utilities.poly_fitting.nearest_neighbor import (
NearestNeighborInterpolator as NNI)
from rrompy.utilities.exception_manager import (RROMPyException, RROMPyAssert,
RROMPyWarning)
from rrompy.parameter import checkParameterList
from rrompy.sampling import emptySampleList
__all__ = ['TrainedModelPivotedRationalNoMatch']
class TrainedModelPivotedRationalNoMatch(TrainedModelRational):
"""
ROM approximant evaluation for pivoted approximants based on interpolation
of rational approximants (without pole matching).
Attributes:
Data: dictionary with all that can be pickled.
"""
def compress(self, collapse : bool = False, tol : float = 0., *args,
**kwargs):
if not collapse and tol <= 0.: return
RMat = self.data.projMat
if not collapse:
if hasattr(self.data, "_compressTol"):
RROMPyWarning(("Recompressing already compressed model is "
"ineffective. Aborting."))
return
self.data.projMat, RMat, _ = compressMatrix(RMat, tol, *args,
**kwargs)
for obj in self.data.HIs:
obj.postmultiplyTensorize(RMat.T)
if hasattr(self, "_HIsExcl"):
for obj in self._HIsExcl:
obj.postmultiplyTensorize(RMat.T)
if hasattr(self.data, "Ps"):
for obj in self.data.Ps:
obj.postmultiplyTensorize(RMat.T)
if hasattr(self, "_PsExcl"):
for obj in self._PsExcl:
obj.postmultiplyTensorize(RMat.T)
+ if hasattr(self.data, "coeffsEff"):
+ for j in range(len(self.data.coeffsEff)):
+ self.data.coeffsEff[j] = dot(self.data.coeffsEff[j], RMat.T)
super(TrainedModelRational, self).compress(collapse, tol)
def centerNormalizePivot(self, mu : paramList = [],
mu0 : paramVal = None) -> paramList:
"""
Compute normalized parameter to be plugged into approximant.
Args:
mu: Parameter(s) 1.
mu0: Parameter(s) 2. If None, set to self.data.mu0Pivot.
Returns:
Normalized parameter.
"""
mu = checkParameterList(mu, self.data.nparPivot)[0]
if mu0 is None: mu0 = self.data.mu0Pivot
rad = ((mu ** self.data.rescalingExpPivot
- mu0 ** self.data.rescalingExpPivot)
/ self.data.scaleFactorPivot)
return rad
def setupMarginalInterp(self, rDWM : Np1D = None):
self.data.NN = NNI()
self.data.NN.setupByInterpolation(self.data.musMarginal,
np.arange(len(self.data.musMarginal)),
1, rDWM)
def updateEffectiveSamples(self, exclude:List[int], *args, **kwargs):
if hasattr(self, "_idxExcl"):
for j, excl in enumerate(self._idxExcl):
self.data.musMarginal.insert(self._musMExcl[j], excl)
self.data.HIs.insert(excl, self._HIsExcl[j])
self.data.Ps.insert(excl, self._PsExcl[j])
self.data.Qs.insert(excl, self._QsExcl[j])
self._idxExcl, self._musMExcl = list(np.sort(exclude)), []
self._HIsExcl, self._PsExcl, self._QsExcl = [], [], []
for excl in self._idxExcl[::-1]:
self._musMExcl = [self.data.musMarginal[excl]] + self._musMExcl
self.data.musMarginal.pop(excl)
self._HIsExcl = [self.data.HIs.pop(excl)] + self._HIsExcl
self._PsExcl = [self.data.Ps.pop(excl)] + self._PsExcl
self._QsExcl = [self.data.Qs.pop(excl)] + self._QsExcl
poles = [hi.poles for hi in self.data.HIs]
coeffs = [hi.coeffs for hi in self.data.HIs]
self.initializeFromLists(poles, coeffs, self.data.HIs[0].polybasis,
*args, **kwargs)
def initializeFromLists(self, poles:ListAny, coeffs:ListAny, basis:str):
"""Initialize Heaviside representation."""
self.data.HIs = []
for pls, cfs in zip(poles, coeffs):
hsi = HI()
hsi.poles = pls
hsi.coeffs = cfs
hsi.npar = 1
hsi.polybasis = basis
self.data.HIs += [hsi]
def initializeFromRational(self):
"""Initialize Heaviside representation."""
RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters")
poles, coeffs = [], []
for Q, P in zip(self.data.Qs, self.data.Ps):
cfs, pls, basis = rational2heaviside(P, Q)
poles += [pls]
coeffs += [cfs]
self.initializeFromLists(poles, coeffs, basis)
def recompressByCutOff(self, tol:float, foci:List[np.complex],
ground:float) -> str:
mu0 = np.mean(foci)
gLocPoles = [potential(hi.poles, foci) - ground <= tol * ground
for hi in self.data.HIs]
nRemPole = np.sum([np.sum(np.logical_not(gLPi)) for gLPi in gLocPoles])
if nRemPole == 0: return " No poles erased."
for hi, gLocPolesi in zip(self.data.HIs, gLocPoles):
N = len(hi.poles)
polyCorrection = np.zeros_like(hi.coeffs[0, :])
for j, goodj in enumerate(gLocPolesi):
if not goodj and not np.isinf(hi.poles[j]):
polyCorrection += hi.coeffs[j, :] / (mu0 - hi.poles[j])
if len(hi.coeffs) == N:
hi.coeffs = np.vstack((hi.coeffs, polyCorrection))
else:
hi.coeffs[N, :] += polyCorrection
hi.poles = hi.poles[gLocPolesi]
gLocCoeffi = np.append(gLocPolesi,
np.ones(len(hi.coeffs) - N, dtype = bool))
hi.coeffs = hi.coeffs[gLocCoeffi, :]
return " Erased {} pole{}.".format(nRemPole, "s" * (nRemPole != 1))
def interpolateMarginalInterpolator(self, mu : paramList = []) -> Np1D:
"""Obtain interpolated approximant interpolator."""
mu = checkParameterList(mu, self.data.nparMarginal)[0]
vbMng(self, "INIT", "Finding nearest neighbor to mu = {}.".format(mu),
95)
intM = np.array(self.data.NN(mu), dtype = int)
vbMng(self, "DEL", "Done finding nearest neighbor.", 95)
return [self.data.HIs[i] for i in intM]
def interpolateMarginalPoles(self, mu : paramList = []) -> Np1D:
"""Obtain interpolated approximant poles."""
interps = self.interpolateMarginalInterpolator(mu)
return np.moveaxis(np.array([interp.poles for interp in interps]),
0, -1)
def interpolateMarginalCoeffs(self, mu : paramList = []) -> Np1D:
"""Obtain interpolated approximant poles."""
interps = self.interpolateMarginalInterpolator(mu)
return np.moveaxis(np.array([interp.coeffs for interp in interps]),
0, -1)
def getApproxReduced(self, mu : paramList = []) -> sampList:
"""
Evaluate reduced representation of approximant at arbitrary parameter.
Args:
mu: Target parameter.
"""
RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters")
mu = checkParameterList(mu, self.data.npar)[0]
if (not hasattr(self, "lastSolvedApproxReduced")
or self.lastSolvedApproxReduced != mu):
vbMng(self, "INIT",
"Evaluating approximant at mu = {}.".format(mu), 12)
self.uApproxReduced = emptySampleList()
muP = self.centerNormalizePivot(mu.data[:,
self.data.directionPivot])
muM = mu.data[:, self.data.directionMarginal]
his = self.interpolateMarginalInterpolator(muM)
for i, (mP, hi) in enumerate(zip(muP, his)):
uAppR = hi(mP)
if i == 0:
self.uApproxReduced.reset((len(uAppR), len(mu)),
dtype = uAppR.dtype)
self.uApproxReduced[i] = uAppR
vbMng(self, "DEL", "Done evaluating approximant.", 12)
self.lastSolvedApproxReduced = mu
return self.uApproxReduced
def getPVal(self, mu : paramList = []) -> sampList:
"""
Evaluate rational numerator at arbitrary parameter.
Args:
mu: Target parameter.
"""
RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters")
mu = checkParameterList(mu, self.data.npar)[0]
p = emptySampleList()
p.reset((len(self.data.HIs[0].coeffs.shape[1]), len(mu)))
muP = self.centerNormalizePivot(mu.data[:, self.data.directionPivot])
muM = mu.data[:, self.data.directionMarginal]
his = self.interpolateMarginalInterpolator(muM)
for i, (mP, hi) in enumerate(zip(muP, his)):
p[i] = hi(mP) * np.prod(mP[0] - hi.poles)
return p
def getQVal(self, mu:Np1D, der : List[int] = None,
scl : Np1D = None) -> Np1D:
"""
Evaluate rational denominator at arbitrary parameter.
Args:
mu: Target parameter.
der(optional): Derivatives to take before evaluation.
"""
RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters")
mu = checkParameterList(mu, self.data.npar)[0]
muP = self.centerNormalizePivot(mu.data[:, self.data.directionPivot])
muM = mu.data[:, self.data.directionMarginal]
if der is None:
derP, derM = 0, [0]
else:
derP = der[self.data.directionPivot[0]]
derM = [der[x] for x in self.data.directionMarginal]
if np.any(np.array(derM) != 0):
raise RROMPyException(("Derivatives of Q with respect to marginal "
"parameters not allowed."))
sclP = 1 if scl is None else scl[self.data.directionPivot[0]]
derVal = np.zeros(len(mu), dtype = np.complex)
pls = self.interpolateMarginalPoles(muM)
for i, (mP, pl) in enumerate(zip(muP, pls)):
N = len(pl)
if derP == N: derVal[i] = 1.
elif derP >= 0 and derP < N:
plDist = muP[0] - pl
for terms in combinations(np.arange(N), N - derP):
derVal[i] += np.prod(plDist[list(terms)], axis = 1)
return sclP ** derP * fact(derP) * derVal
def getPoles(self, *args, **kwargs) -> Np1D:
"""
Obtain approximant poles.
Returns:
Numpy complex vector of poles.
"""
RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters")
if len(args) + len(kwargs) > 1:
raise RROMPyException(("Wrong number of parameters passed. "
"Only 1 available."))
elif len(args) + len(kwargs) == 1:
if len(args) == 1:
mVals = args[0]
else:
mVals = kwargs["marginalVals"]
if not hasattr(mVals, "__len__"): mVals = [mVals]
mVals = list(mVals)
else:
mVals = [fp]
try:
rDim = mVals.index(fp)
if rDim < len(mVals) - 1 and fp in mVals[rDim + 1 :]:
raise
except:
raise RROMPyException(("Exactly 1 'freepar' entry in "
"marginalVals must be provided."))
if rDim != self.data.directionPivot[0]:
raise RROMPyException(("'freepar' entry in marginalVals must "
"coincide with pivot direction."))
mVals[rDim] = self.data.mu0(rDim)
mMarg = [mVals[j] for j in range(len(mVals)) if j != rDim]
roots = self.interpolateMarginalPoles(mMarg)[..., 0]
return np.power(self.data.mu0(rDim) ** self.data.rescalingExp[rDim]
+ self.data.scaleFactor[rDim] * roots,
1. / self.data.rescalingExp[rDim])
def getResidues(self, *args, **kwargs) -> Np1D:
"""
Obtain approximant residues.
Returns:
Numpy matrix with residues as columns.
"""
pls = self.getPoles(*args, **kwargs)
if len(args) == 1:
mVals = args[0]
elif len(args) == 0:
mVals = [None]
else:
mVals = kwargs["marginalVals"]
if not hasattr(mVals, "__len__"): mVals = [mVals]
mVals = list(mVals)
rDim = mVals.index(fp)
mMarg = [mVals[j] for j in range(len(mVals)) if j != rDim]
res = self.interpolateMarginalCoeffs(mMarg)[: len(pls), :, 0].T
if not self.data._collapsed:
res = self.data.projMat[:, : res.shape[0]].dot(res)
return pls, res