Page MenuHomec4science

trained_model_pivoted_rational_polematch.py
No OneTemporary

File Metadata

Created
Tue, May 7, 09:16

trained_model_pivoted_rational_polematch.py

# Copyright (C) 2018-2020 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see <http://www.gnu.org/licenses/>.
#
import warnings
import numpy as np
from scipy.special import factorial as fact
from scipy.sparse import csr_matrix, hstack, SparseEfficiencyWarning
from collections.abc import Iterable
from copy import deepcopy as copy
from itertools import combinations
from rrompy.reduction_methods.standard.trained_model.trained_model_rational \
import TrainedModelRational
from .trained_model_pivoted_rational_nomatch import (
TrainedModelPivotedRationalNoMatch)
from rrompy.utilities.base.types import (Np1D, Np2D, List, ListAny, paramVal,
paramList, sampList, HFEng)
from rrompy.utilities.base import verbosityManager as vbMng, freepar as fp
from rrompy.utilities.numerical import dot
from rrompy.utilities.numerical.point_matching import rationalFunctionMatching
from rrompy.utilities.numerical.degree import reduceDegreeN
from rrompy.utilities.poly_fitting.polynomial import (polybases as ppb,
PolynomialInterpolator as PI)
from rrompy.utilities.poly_fitting.radial_basis import (polybases as rbpb,
RadialBasisInterpolator as RBI)
from rrompy.utilities.poly_fitting.heaviside import (rational2heaviside,
heavisideUniformShape,
HeavisideInterpolator as HI)
from rrompy.utilities.poly_fitting.nearest_neighbor import (
NearestNeighborInterpolator as NNI)
from rrompy.utilities.poly_fitting.piecewise_linear import (sparsekinds,
PiecewiseLinearInterpolator as PLI)
from rrompy.utilities.exception_manager import RROMPyException, RROMPyAssert
from rrompy.sampling import sampleList, emptySampleList
__all__ = ['TrainedModelPivotedRationalPoleMatch']
class TrainedModelPivotedRationalPoleMatch(TrainedModelPivotedRationalNoMatch):
"""
ROM approximant evaluation for pivoted approximants based on interpolation
of rational approximants (with pole matching).
Attributes:
Data: dictionary with all that can be pickled.
"""
def compress(self, collapse : bool = False, tol : float = 0.,
returnRMat : bool = False, **compressMatrixkwargs):
Psupp = copy(self.data.Psupp)
RMat = super().compress(collapse, tol, True, **compressMatrixkwargs)
if RMat is None: return
for obj, suppj in zip(self.data.HIs, Psupp):
obj.postmultiplyTensorize(RMat.T[suppj : suppj + obj.shape[0]])
if hasattr(self, "_HIsExcl"):
for obj, suppj in zip(self._HIsExcl, Psupp):
obj.postmultiplyTensorize(RMat.T[suppj : suppj + obj.shape[0]])
if not hasattr(self, "_PsExcl"):
self._PsuppExcl = [0] * len(self._PsuppExcl)
if returnRMat: return RMat
def centerNormalizeMarginal(self, mu : paramList = [],
mu0 : paramVal = None) -> paramList:
"""
Compute normalized parameter to be plugged into approximant.
Args:
mu: Parameter(s) 1.
mu0: Parameter(s) 2. If None, set to self.data.mu0Marginal.
Returns:
Normalized parameter.
"""
mu = self.checkParameterListMarginal(mu)
if mu0 is None:
mu0 = self.checkParameterListMarginal(
self.data.mu0(0, self.data.directionMarginal))
return (self.mapParameterList(mu, idx = self.data.directionMarginal)
- self.mapParameterList(mu0, idx = self.data.directionMarginal)
) / [self.data.scaleFactor[x]
for x in self.data.directionMarginal]
def setupMarginalInterp(self, approx, interpPars:ListAny, extraPar = None):
vbMng(self, "INIT", "Starting computation of marginal interpolator.",
12)
musMCN = self.centerNormalizeMarginal(self.data.musMarginal)
nM, pbM = len(musMCN), approx.polybasisMarginal
if pbM in ppb + rbpb:
if extraPar: approx._setMMarginalAuto()
_MMarginalEff = approx.paramsMarginal["MMarginal"]
if pbM in ppb:
p = PI()
elif pbM in rbpb:
p = RBI()
else: # if pbM in sparsekinds + ["NEARESTNEIGHBOR"]:
if pbM == "NEARESTNEIGHBOR":
p = NNI()
else: # if pbM in sparsekinds:
pllims = [[-1.] * self.data.nparMarginal,
[1.] * self.data.nparMarginal]
p = PLI()
for ipts, pts in enumerate(self.data.suppEffPts):
if len(pts) == 0:
raise RROMPyException("Empty list of support points.")
musMCNEff, valsEff = musMCN[pts], np.eye(len(pts))
if pbM in ppb + rbpb:
if extraPar:
if ipts > 0:
verb = approx.verbosity
approx.verbosity = 0
_musM = approx.musMarginal
approx.musMarginal = musMCNEff
approx._setMMarginalAuto()
approx.musMarginal = _musM
approx.verbosity = verb
else:
approx.paramsMarginal["MMarginal"] = reduceDegreeN(
_MMarginalEff, len(musMCNEff), self.data.nparMarginal,
approx.paramsMarginal["polydegreetypeMarginal"])
MMEff = approx.paramsMarginal["MMarginal"]
while MMEff >= 0:
wellCond, msg = p.setupByInterpolation(musMCNEff, valsEff,
MMEff, *interpPars)
vbMng(self, "MAIN", msg, 30)
if wellCond: break
vbMng(self, "MAIN",
("Polyfit is poorly conditioned. Reducing "
"MMarginal by 1."), 35)
MMEff -= 1
if MMEff < 0:
raise RROMPyException(("Instability in computation of "
"interpolant. Aborting."))
if (pbM in rbpb and len(interpPars) > 4
and "optimizeScalingBounds" in interpPars[4].keys()):
interpPars[4]["optimizeScalingBounds"] = [-1., -1.]
elif pbM == "NEARESTNEIGHBOR":
if ipts > 0: interpPars[0] = 1
p.setupByInterpolation(musMCNEff, valsEff, *interpPars)
elif ipts == 0: # and pbM in sparsekinds:
p.setupByInterpolation(musMCNEff, valsEff, pllims,
extraPar[pts], *interpPars)
if ipts == 0:
self.data.marginalInterp = copy(p)
self.data.coeffsEff, self.data.polesEff = [], []
for hi, sup in zip(self.data.HIs, self.data.Psupp):
cEff = hi.coeffs
if (self.data._collapsed
or self.supportEnd == cEff.shape[1]):
cEff = copy(cEff)
else:
supC = self.supportEnd - sup - cEff.shape[1]
cEff = hstack((csr_matrix((len(cEff), sup)),
csr_matrix(cEff),
csr_matrix((len(cEff), supC))), "csr")
self.data.coeffsEff += [cEff]
self.data.polesEff += [copy(hi.poles)]
else:
ptsBad = [i for i in range(nM) if i not in pts]
idxBad = np.where(self.data.suppEffIdx == ipts)[0]
warnings.simplefilter('ignore', SparseEfficiencyWarning)
if pbM in sparsekinds:
for ij, j in enumerate(ptsBad):
nearest = pts[np.argmin(np.sum(np.abs(musMCNEff.data
- np.tile(musMCN[j], [len(pts), 1])
), axis = 1).flatten())]
self.data.coeffsEff[j][idxBad] = copy(
self.data.coeffsEff[nearest][idxBad])
self.data.polesEff[j][idxBad] = copy(
self.data.polesEff[nearest][idxBad])
else:
if (self.data._collapsed
or self.supportEnd == cEff.shape[1]):
cfBase = np.zeros((len(idxBad), cEff.shape[1]),
dtype = cEff.dtype)
else:
cfBase = csr_matrix((len(idxBad), self.supportEnd),
dtype = cEff.dtype)
valMuMBad = p(musMCN[ptsBad])
for ijb, jb in enumerate(ptsBad):
self.data.coeffsEff[jb][idxBad] = copy(cfBase)
self.data.polesEff[jb][idxBad] = 0.
for ij, j in enumerate(pts):
val = valMuMBad[ij][ijb]
if not np.isclose(val, 0.):
self.data.coeffsEff[jb][idxBad] += (val
* self.data.coeffsEff[j][idxBad])
self.data.polesEff[jb][idxBad] += (val
* self.data.polesEff[j][idxBad])
warnings.filters.pop(0)
if pbM in ppb + rbpb:
approx.paramsMarginal["MMarginal"] = _MMarginalEff
vbMng(self, "DEL", "Done computing marginal interpolator.", 12)
def updateEffectiveSamples(self, exclude:List[int], *args, **kwargs):
if hasattr(self, "_idxExcl"):
for j, excl in enumerate(self._idxExcl):
self.data.HIs.insert(excl, self._HIsExcl[j])
super().updateEffectiveSamples(exclude)
self._HIsExcl = []
for excl in self._idxExcl[::-1]:
self._HIsExcl = [self.data.HIs.pop(excl)] + self._HIsExcl
poles = [hi.poles for hi in self.data.HIs]
coeffs = [hi.coeffs for hi in self.data.HIs]
self.initializeFromLists(poles, coeffs, self.data.Psupp,
self.data.HIs[0].polybasis, *args, **kwargs)
def initializeFromRational(self, matchingWeight:float, HFEngine:HFEng,
is_state:bool):
"""Initialize Heaviside representation."""
RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters")
poles, coeffs = [], []
for Q, P in zip(self.data.Qs, self.data.Ps):
cfs, pls, basis = rational2heaviside(P, Q)
poles += [pls]
coeffs += [cfs]
self.initializeFromLists(poles, coeffs, self.data.Psupp, basis,
matchingWeight, HFEngine, is_state)
def initializeFromLists(self, poles:ListAny, coeffs:ListAny, supps:ListAny,
basis:str, matchingWeight:float, HFEngine:HFEng,
is_state:bool):
"""Initialize Heaviside representation."""
Ns = [len(pls) for pls in poles]
poles, coeffs = heavisideUniformShape(poles, coeffs)
root = Ns.index(len(poles[0]))
if hasattr(self.data, "_is_C_quadratic") and self.data._is_C_quadratic:
csizemax = np.max([len(c) for c in coeffs])
#csizemax = np.max([c.shape[1] for c in coeffs])
if self.data._is_C_quadratic == 1:
csizemax = csizemax ** 2
else: # if self.data._is_C_quadratic == 2:
csizemax = csizemax * (csizemax + 1) // 2
TrainedModelRational._setupQuadMapping(self, csizemax)
projMapping = self.quad_mapping
projMappingReal = self.data._is_C_quadratic == 2
else:
projMapping, projMappingReal = None, False
poles, coeffs = rationalFunctionMatching(poles, coeffs,
self.data.musMarginal.data,
matchingWeight, supps,
self.data.projMat, HFEngine,
is_state, root, projMapping,
projMappingReal)
self.data.HIs = []
for pls, cfs in zip(poles, coeffs):
hsi = HI()
hsi.poles = pls
if len(cfs) == len(pls):
cfs = np.pad(cfs, ((0, 1), (0, 0)), "constant")
hsi.coeffs = cfs
hsi.npar = 1
hsi.polybasis = basis
self.data.HIs += [hsi]
self.data.suppEffPts = [np.arange(len(self.data.HIs))]
self.data.suppEffIdx = np.zeros(len(poles[0]), dtype = int)
def checkSharedRatio(self, shared:float) -> str:
N = len(self.data.HIs[0].poles)
M = len(self.data.HIs)
goodLocPoles = np.array([np.logical_not(np.isinf(hi.poles)
) for hi in self.data.HIs])
self.data.suppEffPts = [np.arange(len(self.data.HIs))]
self.data.suppEffIdx = np.zeros(N, dtype = int)
if np.all(np.all(goodLocPoles)): return " No poles erased."
goodGlobPoles = np.sum(goodLocPoles, axis = 0)
goodEnoughPoles = goodGlobPoles >= max(1., 1. * shared * M)
keepPole = np.where(goodEnoughPoles)[0]
halfPole = np.where(np.logical_and(goodEnoughPoles,
goodGlobPoles < M))[0]
removePole = np.where(np.logical_not(goodEnoughPoles))[0]
if len(removePole) > 0:
keepCoeff = np.append(keepPole,
np.arange(N, len(self.data.HIs[0].coeffs)))
for hi in self.data.HIs:
for j in removePole:
if not np.isinf(hi.poles[j]):
hi.coeffs[N, :] -= hi.coeffs[j, :] / hi.poles[j]
hi.poles = hi.poles[keepPole]
hi.coeffs = hi.coeffs[keepCoeff, :]
for idxR in halfPole:
pts = np.where(goodLocPoles[:, idxR])[0]
idxEff = len(self.data.suppEffPts)
for idEff, prevPts in enumerate(self.data.suppEffPts):
if len(prevPts) == len(pts):
if np.allclose(prevPts, pts):
idxEff = idEff
break
if idxEff == len(self.data.suppEffPts):
self.data.suppEffPts += [pts]
self.data.suppEffIdx[idxR] = idxEff
self.data.suppEffIdx = self.data.suppEffIdx[keepPole]
return (" Hard-erased {} pole".format(len(removePole))
+ "s" * (len(removePole) != 1)
+ " and soft-erased {} pole".format(len(halfPole))
+ "s" * (len(halfPole) != 1) + ".")
def getApproxReduced(self, mu : paramList = []) -> sampList:
"""
Evaluate reduced representation of approximant at arbitrary parameter.
Args:
mu: Target parameter.
"""
RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters")
mu = self.checkParameterList(mu)
if (not hasattr(self, "lastSolvedApproxReduced")
or self.lastSolvedApproxReduced != mu):
vbMng(self, "INIT",
"Evaluating approximant at mu = {}.".format(mu), 12)
muP = self.centerNormalizePivot(mu(self.data.directionPivot))
muM = mu(self.data.directionMarginal)
his = self.interpolateMarginalInterpolator(muM)
for i, (mP, hi) in enumerate(zip(muP, his)):
uAppR = hi(mP)[:, 0]
if i == 0:
uApproxR = np.empty((len(uAppR), len(mu)),
dtype = uAppR.dtype)
uApproxR[:, i] = uAppR
self.uApproxReduced = sampleList(uApproxR)
vbMng(self, "DEL", "Done evaluating approximant.", 12)
self.lastSolvedApproxReduced = mu
return self.uApproxReduced
def interpolateMarginalInterpolator(self, mu : paramList = []) -> ListAny:
"""Obtain interpolated approximant interpolator."""
mu = self.checkParameterListMarginal(mu)
vbMng(self, "INIT",
"Interpolating marginal models at mu = {}.".format(mu), 95)
his = []
muC = self.centerNormalizeMarginal(mu)
mIvals = self.data.marginalInterp(muC)
verb, self.verbosity = self.verbosity, 0
poless = self.interpolateMarginalPoles(mu, mIvals)
coeffss = self.interpolateMarginalCoeffs(mu, mIvals)
self.verbosity = verb
for j in range(len(mu)):
his += [HI()]
his[-1].poles = poless[j]
his[-1].coeffs = coeffss[j]
his[-1].npar = 1
his[-1].polybasis = self.data.HIs[0].polybasis
vbMng(self, "DEL", "Done interpolating marginal models.", 95)
return his
def interpolateMarginalPoles(self, mu : paramList = [],
mIvals : Np2D = None) -> ListAny:
"""Obtain interpolated approximant poles."""
mu = self.checkParameterListMarginal(mu)
vbMng(self, "INIT",
"Interpolating marginal poles at mu = {}.".format(mu), 95)
intMPoles = np.zeros((len(mu),) + self.data.polesEff[0].shape,
dtype = self.data.polesEff[0].dtype)
if mIvals is None:
muC = self.centerNormalizeMarginal(mu)
mIvals = self.data.marginalInterp(muC)
for pEff, mI in zip(self.data.polesEff, mIvals):
intMPoles += np.expand_dims(mI, - 1) * pEff
vbMng(self, "DEL", "Done interpolating marginal poles.", 95)
return intMPoles
def interpolateMarginalCoeffs(self, mu : paramList = [],
mIvals : Np2D = None) -> ListAny:
"""Obtain interpolated approximant coefficients."""
mu = self.checkParameterListMarginal(mu)
vbMng(self, "INIT",
"Interpolating marginal coefficients at mu = {}.".format(mu), 95)
intMCoeffs = np.zeros((len(mu),) + self.data.coeffsEff[0].shape,
dtype = self.data.coeffsEff[0].dtype)
if mIvals is None:
muC = self.centerNormalizeMarginal(mu)
mIvals = self.data.marginalInterp(muC)
for cEff, mI in zip(self.data.coeffsEff, mIvals):
for j, m in enumerate(mI): intMCoeffs[j] += m * cEff
vbMng(self, "DEL", "Done interpolating marginal coefficients.", 95)
return intMCoeffs
def getPVal(self, mu : paramList = []) -> sampList:
"""
Evaluate rational numerator at arbitrary parameter.
Args:
mu: Target parameter.
"""
RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters")
mu = self.checkParameterList(mu)
p = emptySampleList()
muP = self.centerNormalizePivot(mu(self.data.directionPivot))
muM = mu(self.data.directionMarginal)
his = self.interpolateMarginalInterpolator(muM)
for i, (mP, hi) in enumerate(zip(muP, his)):
Pval = hi(mP) * np.prod(mP[0] - hi.poles)
if i == 0: p.reset((len(Pval), len(mu)), dtype = Pval.dtype)
p[i] = Pval
return p
def getQVal(self, mu:Np1D, der : List[int] = None,
scl : Np1D = None) -> Np1D:
"""
Evaluate rational denominator at arbitrary parameter.
Args:
mu: Target parameter.
der(optional): Derivatives to take before evaluation.
"""
RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters")
mu = self.checkParameterList(mu)
muP = self.centerNormalizePivot(mu(self.data.directionPivot))
muM = mu(self.data.directionMarginal)
if der is None:
derP, derM = 0, [0]
else:
derP = der[self.data.directionPivot[0]]
derM = [der[x] for x in self.data.directionMarginal]
if np.any(np.array(derM) != 0):
raise RROMPyException(("Derivatives of Q with respect to marginal "
"parameters not allowed."))
sclP = 1 if scl is None else scl[self.data.directionPivot[0]]
derVal = np.zeros(len(mu), dtype = np.complex)
pls = self.interpolateMarginalPoles(muM)
for i, (mP, pl) in enumerate(zip(muP, pls)):
N = len(pl)
if derP == N: derVal[i] = 1.
elif derP >= 0 and derP < N:
plDist = mP[0] - pl
for terms in combinations(np.arange(N), N - derP):
derVal[i] += np.prod(plDist[list(terms)])
return sclP ** derP * fact(derP) * derVal
def getPoles(self, *args, **kwargs) -> Np1D:
"""
Obtain approximant poles.
Returns:
Numpy complex vector of poles.
"""
RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters")
if len(args) + len(kwargs) > 1:
raise RROMPyException(("Wrong number of parameters passed. "
"Only 1 available."))
elif len(args) + len(kwargs) == 1:
if len(args) == 1:
mVals = args[0]
else:
mVals = kwargs["marginalVals"]
if not isinstance(mVals, Iterable): mVals = [mVals]
mVals = list(mVals)
else:
mVals = [fp]
rDim = mVals.index(fp)
if rDim < len(mVals) - 1 and fp in mVals[rDim + 1 :]:
raise RROMPyException(("Exactly 1 'freepar' entry in "
"marginalVals must be provided."))
if rDim != self.data.directionPivot[0]:
raise RROMPyException(("'freepar' entry in marginalVals must "
"coincide with pivot direction."))
mVals[rDim] = self.data.mu0(rDim)[0]
mMarg = [mVals[j] for j in range(len(mVals)) if j != rDim]
roots = (self.data.scaleFactor[rDim]
* self.interpolateMarginalPoles(mMarg)[0])
return self.mapParameterList(self.mapParameterList(self.data.mu0(rDim),
idx = [rDim])(0, 0)
+ roots, "B", [rDim])(0)
def getResidues(self, *args, **kwargs) -> Np2D:
"""
Obtain approximant residues.
Returns:
Numpy matrix with residues as columns.
"""
pls = self.getPoles(*args, **kwargs)
if len(args) == 1:
mVals = args[0]
elif len(args) == 0:
mVals = [None]
else:
mVals = kwargs["marginalVals"]
if not isinstance(mVals, Iterable): mVals = [mVals]
mVals = list(mVals)
rDim = mVals.index(fp)
mMarg = [mVals[j] for j in range(len(mVals)) if j != rDim]
res = self.interpolateMarginalCoeffs(mMarg)[0][: len(pls), :].T
if hasattr(self.data, "_is_C_quadratic") and self.data._is_C_quadratic:
self._setupQuadMapping()
res = res[self.quad_mapping[0]] * res[self.quad_mapping[1]].conj()
if not self.data._collapsed: res = dot(self.data.projMat, res).T
if (hasattr(self.data, "_is_C_quadratic")
and self.data._is_C_quadratic == 2):
res = np.real(res)
return pls, res

Event Timeline