Page MenuHomec4science

trained_model_pivoted_rational_nomatch.py
No OneTemporary

File Metadata

Created
Tue, Jun 4, 15:41

trained_model_pivoted_rational_nomatch.py

# Copyright (C) 2018-2020 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see <http://www.gnu.org/licenses/>.
#
import numpy as np
from collections.abc import Iterable
from rrompy.reduction_methods.standard.trained_model.trained_model_rational \
import TrainedModelRational
from rrompy.utilities.base.types import (Np1D, Np2D, List, ListAny, paramVal,
paramList, sampList)
from rrompy.utilities.base import verbosityManager as vbMng, freepar as fp
from rrompy.utilities.numerical import dot
from rrompy.utilities.numerical.compress_matrix import compressMatrix
from rrompy.utilities.poly_fitting.heaviside import rational2heaviside
from rrompy.utilities.poly_fitting.nearest_neighbor import (
NearestNeighborInterpolator as NNI)
from rrompy.utilities.exception_manager import (RROMPyException, RROMPyAssert,
RROMPyWarning)
from rrompy.parameter import checkParameterList
from rrompy.sampling import emptySampleList
__all__ = ['TrainedModelPivotedRationalNoMatch']
class TrainedModelPivotedRationalNoMatch(TrainedModelRational):
"""
ROM approximant evaluation for pivoted approximants based on interpolation
of rational approximants (without pole matching).
Attributes:
Data: dictionary with all that can be pickled.
"""
@property
def supportEnd(self):
lookAtExcl = hasattr(self, "_PsuppExcl") and len(self._PsuppExcl) > 0
idxIncl = np.argmax(self.data.Psupp)
if lookAtExcl: idxExcl = np.argmax(self._PsuppExcl)
if (not lookAtExcl
or self.data.Psupp[idxIncl] >= self._PsuppExcl[idxExcl]):
return self.data.Psupp[idxIncl] + self.data.Ps[idxIncl].shape[0]
return self._PsuppExcl[idxExcl] + self._PsExcl[idxExcl].shape[0]
def checkParameterListPivot(self, mu:paramList,
check_if_single : bool = False) -> paramList:
return checkParameterList(mu, self.data.nparPivot, check_if_single)
def checkParameterListMarginal(self, mu:paramList,
check_if_single : bool = False) -> paramList:
return checkParameterList(mu, self.data.nparMarginal, check_if_single)
def compress(self, collapse : bool = False, tol : float = 0.,
returnRMat : bool = False, **compressMatrixkwargs):
if not collapse and tol <= 0.: return
if hasattr(self.data, "_is_C_quadratic") and self.data._is_C_quadratic:
raise RROMPyException(("Cannot compress model with quadratic "
"output."))
RMat = self.data.projMat
if not collapse:
if hasattr(self.data, "_compressTol"):
RROMPyWarning(("Recompressing already compressed model is "
"ineffective. Aborting."))
return
self.data.projMat, RMat, _ = compressMatrix(RMat, tol,
**compressMatrixkwargs)
if hasattr(self.data, "Ps"):
for obj, suppj in zip(self.data.Ps, self.data.Psupp):
obj.postmultiplyTensorize(RMat.T[suppj : suppj + obj.shape[0]])
if hasattr(self, "_PsExcl"):
for obj, suppj in zip(self._PsExcl, self._PsuppExcl):
obj.postmultiplyTensorize(RMat.T[suppj : suppj + obj.shape[0]])
self._PsuppExcl = [0] * len(self._PsuppExcl)
self.data.Psupp = [0] * len(self.data.Psupp)
super(TrainedModelRational, self).compress(collapse, tol)
if returnRMat: return RMat
def centerNormalizePivot(self, mu : paramList = [],
mu0 : paramVal = None) -> paramList:
"""
Compute normalized parameter to be plugged into approximant.
Args:
mu: Parameter(s) 1.
mu0: Parameter(s) 2. If None, set to self.data.mu0Pivot.
Returns:
Normalized parameter.
"""
mu = self.checkParameterListPivot(mu)
if mu0 is None:
mu0 = self.checkParameterListPivot(
self.data.mu0(0, self.data.directionPivot))
return (self.mapParameterList(mu, idx = self.data.directionPivot)
- self.mapParameterList(mu0, idx = self.data.directionPivot)
) / [self.data.scaleFactor[x] for x in self.data.directionPivot]
def setupMarginalInterp(self, interpPars:ListAny):
self.data.marginalInterp = NNI()
self.data.marginalInterp.setupByInterpolation(self.data.musMarginal,
np.arange(len(self.data.musMarginal)),
1, *interpPars)
def updateEffectiveSamples(self, exclude:List[int]):
if hasattr(self, "_idxExcl"):
for j, excl in enumerate(self._idxExcl):
self.data.musMarginal.insert(self._musMExcl[j], excl)
self.data.Ps.insert(excl, self._PsExcl[j])
self.data.Qs.insert(excl, self._QsExcl[j])
self.data.Psupp.insert(excl, self._PsuppExcl[j])
self._idxExcl, self._musMExcl = list(np.sort(exclude)), []
self._PsExcl, self._QsExcl, self._PsuppExcl = [], [], []
for excl in self._idxExcl[::-1]:
self._musMExcl = [self.data.musMarginal[excl]] + self._musMExcl
self.data.musMarginal.pop(excl)
self._PsExcl = [self.data.Ps.pop(excl)] + self._PsExcl
self._QsExcl = [self.data.Qs.pop(excl)] + self._QsExcl
self._PsuppExcl = [self.data.Psupp.pop(excl)] + self._PsuppExcl
def _setupQuadMapping(self, N2 : List[int] = None):
if not (hasattr(self.data, "_is_C_quadratic")
and self.data._is_C_quadratic):
RROMPyWarning(("Quadratic mapping is useless if output is not "
"quadratic. Aborting."))
return
if N2 is None:
N2 = np.array([P.shape[0] for P in self.data.Ps], dtype = int)
if self.data._is_C_quadratic == 1:
N2 = N2 ** 2
else: # if self.data._is_C_quadratic == 2:
N2 = N2 * (N2 + 1) // 2
if (hasattr(self, "quad_mapping")
and self.quad_mapping.shape[1] == np.sum(N2)): return
quad_mapping = np.empty((2, 0), dtype = int)
for Nloc, suppj in zip(N2, self.data.Psupp):
self.quad_mapping = np.empty((0, 0))
super()._setupQuadMapping(Nloc)
quad_mapping = np.append(quad_mapping, self.quad_mapping + suppj,
axis = 1)
self.quad_mapping = quad_mapping
def getPVal(self, mu : paramList = []) -> sampList:
"""
Evaluate rational numerator at arbitrary parameter.
Args:
mu: Target parameter.
"""
RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters")
mu = self.checkParameterList(mu)
muP = self.centerNormalizePivot(mu(self.data.directionPivot))
muM = self.checkParameterListMarginal(mu(self.data.directionMarginal))
intM = np.array(self.data.marginalInterp(muM), dtype = int)
p = emptySampleList()
vbMng(self, "INIT", "Evaluating numerator at mu = {}.".format(mu), 17)
for i, iM in enumerate(intM):
Pval, supp = self.data.Ps[iM](muP[i]), self.data.Psupp[iM]
if i == 0:
p.reset((self.supportEnd, len(mu)), dtype = Pval.dtype)
p.data[:] = 0.
p.data[supp : supp + len(Pval), i] = Pval[:, 0]
vbMng(self, "DEL", "Done evaluating numerator.", 17)
return p
def getQVal(self, mu:Np1D, der : List[int] = None,
scl : Np1D = None) -> Np1D:
"""
Evaluate rational denominator at arbitrary parameter.
Args:
mu: Target parameter.
der(optional): Derivatives to take before evaluation.
"""
RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters")
mu = self.checkParameterList(mu)
muP = self.centerNormalizePivot(mu(self.data.directionPivot))
muM = self.checkParameterListMarginal(mu(self.data.directionMarginal))
if der is None:
derP, derM = 0, [0]
else:
derP = der[self.data.directionPivot[0]]
derM = [der[x] for x in self.data.directionMarginal]
if np.any(np.array(derM) != 0):
raise RROMPyException(("Derivatives of Q with respect to marginal "
"parameters not allowed."))
sclP = 1 if scl is None else scl[self.data.directionPivot[0]]
intM = np.array(self.data.marginalInterp(muM), dtype = int)
vbMng(self, "INIT", "Evaluating denominator at mu = {}.".format(mu),
17)
q = np.array([self.data.Qs[iM](mP, derP, sclP)[0]
for iM, mP in zip(intM, muP)])
vbMng(self, "DEL", "Done evaluating denominator.", 17)
return q
def getPoles(self, *args, **kwargs) -> Np1D:
"""
Obtain approximant poles.
Returns:
Numpy complex vector of poles.
"""
RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters")
if len(args) + len(kwargs) > 1:
raise RROMPyException(("Wrong number of parameters passed. "
"Only 1 available."))
elif len(args) + len(kwargs) == 1:
if len(args) == 1:
mVals = args[0]
else:
mVals = kwargs["marginalVals"]
if not isinstance(mVals, Iterable): mVals = [mVals]
mVals = list(mVals)
else:
mVals = [fp]
rDim = mVals.index(fp)
if rDim < len(mVals) - 1 and fp in mVals[rDim + 1 :]:
raise RROMPyException(("Exactly 1 'freepar' entry in "
"marginalVals must be provided."))
if rDim != self.data.directionPivot[0]:
raise RROMPyException(("'freepar' entry in marginalVals must "
"coincide with pivot direction."))
mVals[rDim] = self.data.mu0(rDim)[0]
muM = self.checkParameterListMarginal([mVals[j]
for j in range(len(mVals)) if j != rDim])
iM = int(self.data.marginalInterp(muM))
roots = self.data.scaleFactor[rDim] * self.data.Qs[iM].roots()
return self.mapParameterList(self.mapParameterList(self.data.mu0(rDim),
idx = [rDim])(0, 0)
+ roots, "B", [rDim])(0)
def getResidues(self, *args, **kwargs) -> Np2D:
"""
Obtain approximant residues.
Returns:
Numpy matrix with residues as columns.
"""
RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters")
if len(args) + len(kwargs) > 1:
raise RROMPyException(("Wrong number of parameters passed. "
"Only 1 available."))
elif len(args) + len(kwargs) == 1:
if len(args) == 1:
mVals = args[0]
else:
mVals = kwargs["marginalVals"]
if not isinstance(mVals, Iterable): mVals = [mVals]
mVals = list(mVals)
else:
mVals = [fp]
rDim = mVals.index(fp)
if rDim < len(mVals) - 1 and fp in mVals[rDim + 1 :]:
raise RROMPyException(("Exactly 1 'freepar' entry in "
"marginalVals must be provided."))
if rDim != self.data.directionPivot[0]:
raise RROMPyException(("'freepar' entry in marginalVals must "
"coincide with pivot direction."))
mVals[rDim] = self.data.mu0(rDim)[0]
muM = self.checkParameterListMarginal([mVals[j]
for j in range(len(mVals)) if j != rDim])
iM = int(self.data.marginalInterp(muM))
res, pls, basis = rational2heaviside(self.data.Ps[iM],
self.data.Qs[iM])
res = res[: len(pls), :].T
if hasattr(self.data, "_is_C_quadratic") and self.data._is_C_quadratic:
self._setupQuadMapping()
res = res[self.quad_mapping[0]] * res[self.quad_mapping[1]].conj()
if not self.data._collapsed: res = dot(self.data.projMat, res).T
if (hasattr(self.data, "_is_C_quadratic")
and self.data._is_C_quadratic == 2):
res = np.real(res)
return pls, res

Event Timeline