Page MenuHomec4science

trained_model_pivoted_rational_polematch.py
No OneTemporary

File Metadata

Created
Wed, May 1, 12:06

trained_model_pivoted_rational_polematch.py

# Copyright (C) 2018-2020 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see <http://www.gnu.org/licenses/>.
#
import warnings
import numpy as np
from scipy.special import factorial as fact
from scipy.sparse import csr_matrix, hstack, SparseEfficiencyWarning
from copy import deepcopy as copy
from itertools import combinations
from .trained_model_pivoted_rational_nomatch import (
TrainedModelPivotedRationalNoMatch)
from .trained_model_pivoted_rational_match import (
TrainedModelPivotedRationalMatch)
from rrompy.utilities.base.types import (Tuple, Np1D, Np2D, List, ListAny,
paramList, sampList, HFEng)
from rrompy.utilities.base import verbosityManager as vbMng, freepar as fp
from rrompy.utilities.numerical import dot
from rrompy.utilities.numerical.point_matching import rationalFunctionMatching
from rrompy.utilities.numerical.degree import reduceDegreeN
from rrompy.utilities.poly_fitting.polynomial import (polybases as ppb,
PolynomialInterpolator as PI)
from rrompy.utilities.poly_fitting.radial_basis import (polybases as rbpb,
RadialBasisInterpolator as RBI)
from rrompy.utilities.poly_fitting.nearest_neighbor import (
NearestNeighborInterpolator as NNI)
from rrompy.utilities.poly_fitting.heaviside import (rational2heaviside,
polyval as heavival,
heavisideUniformShape,
HeavisideInterpolator as HI)
from rrompy.utilities.poly_fitting.piecewise_linear import (sparsekinds,
PiecewiseLinearInterpolator as PLI)
from rrompy.utilities.exception_manager import (RROMPyException, RROMPyAssert,
RROMPyWarning)
from rrompy.sampling import sampleList, emptySampleList
__all__ = ['TrainedModelPivotedRationalPoleMatch']
class TrainedModelPivotedRationalPoleMatch(TrainedModelPivotedRationalMatch):
"""
ROM approximant evaluation for pivoted approximants based on interpolation
of rational approximants (with pole matching).
Attributes:
Data: dictionary with all that can be pickled.
"""
def compress(self, collapse : bool = False, tol : float = 0.,
returnRMat : bool = False, **compressMatrixkwargs):
Psupp = copy(self.data.Psupp)
RMat = super().compress(collapse, tol, True, **compressMatrixkwargs)
if RMat is None: return
for j in range(len(self.data.coeffsEff)):
self.data.coeffsEff[j] = dot(self.data.coeffsEff[j], RMat.T)
for obj, suppj in zip(self.data.HIs, Psupp):
obj.postmultiplyTensorize(RMat.T[suppj : suppj + obj.shape[0]])
if hasattr(self, "_HIsExcl"):
for obj, suppj in zip(self._HIsExcl, Psupp):
obj.postmultiplyTensorize(RMat.T[suppj : suppj + obj.shape[0]])
if not hasattr(self, "_PsExcl"):
self._PsuppExcl = [0] * len(self._PsuppExcl)
if returnRMat: return RMat
def setupMarginalInterp(self, approx, interpPars:ListAny, extraPar = None):
vbMng(self, "INIT", "Starting computation of marginal interpolator.",
12)
musMCN = self.centerNormalizeMarginal(self.data.musMarginal)
nM, pbM = len(musMCN), approx.polybasisMarginal
if pbM in ppb + rbpb:
if extraPar: approx._setMMarginalAuto()
_MMarginalEff = approx.paramsMarginal["MMarginal"]
if pbM in ppb:
p = PI()
elif pbM in rbpb:
p = RBI()
else: # if pbM in sparsekinds + ["NEARESTNEIGHBOR"]:
if pbM == "NEARESTNEIGHBOR":
p = NNI()
else: # if pbM in sparsekinds:
pllims = [[-1.] * self.data.nparMarginal,
[1.] * self.data.nparMarginal]
p = PLI()
for ipts, pts in enumerate(self.data.suppEffPts):
if len(pts) == 0:
raise RROMPyException("Empty list of support points.")
musMCNEff, valsEff = musMCN[pts], np.eye(len(pts))
if pbM in ppb + rbpb:
if extraPar:
if ipts > 0:
verb = approx.verbosity
approx.verbosity = 0
_musM = approx.musMarginal
approx.musMarginal = musMCNEff
approx._setMMarginalAuto()
approx.musMarginal = _musM
approx.verbosity = verb
else:
approx.paramsMarginal["MMarginal"] = reduceDegreeN(
_MMarginalEff, len(musMCNEff), self.data.nparMarginal,
approx.paramsMarginal["polydegreetypeMarginal"])
MMEff = approx.paramsMarginal["MMarginal"]
while MMEff >= 0:
wellCond, msg = p.setupByInterpolation(musMCNEff, valsEff,
MMEff, *interpPars)
vbMng(self, "MAIN", msg, 30)
if wellCond: break
vbMng(self, "MAIN",
("Polyfit is poorly conditioned. Reducing "
"MMarginal by 1."), 35)
MMEff -= 1
if MMEff < 0:
raise RROMPyException(("Instability in computation of "
"interpolant. Aborting."))
if (pbM in rbpb and len(interpPars) > 4
and "optimizeScalingBounds" in interpPars[4].keys()):
interpPars[4]["optimizeScalingBounds"] = [-1., -1.]
elif pbM == "NEARESTNEIGHBOR":
if ipts > 0: interpPars[0] = 1
p.setupByInterpolation(musMCNEff, valsEff, *interpPars)
elif ipts == 0: # and pbM in sparsekinds:
p.setupByInterpolation(musMCNEff, valsEff, pllims,
extraPar[pts], *interpPars)
if ipts == 0:
self.data.marginalInterp = copy(p)
self.data.coeffsEff, self.data.polesEff = [], []
N = len(self.data.suppEffIdx)
goodIdx = np.where(self.data.suppEffIdx != -1)[0]
for hi, sup in zip(self.data.HIs, self.data.Psupp):
pEff, cEff = hi.poles.reshape(-1, 1), hi.coeffs
cEffH = np.empty((cEff.shape[0], 0))
if (self.data._collapsed
or self.data.projMat.shape[1] == cEff.shape[1]):
cEff = np.hstack([cEff, cEffH])
else:
supC = self.data.projMat.shape[1] - sup - cEff.shape[1]
cEff = hstack((csr_matrix((len(cEff), sup)),
csr_matrix(cEff),
csr_matrix((len(cEff), supC)),
cEffH), "csr")
goodIdxC = np.append(goodIdx, np.arange(N, cEff.shape[0]))
self.data.coeffsEff += [cEff[goodIdxC, :]]
self.data.polesEff += [pEff[goodIdx]]
else:
ptsBad = [i for i in range(nM) if i not in pts]
idxBad = np.where(self.data.suppEffIdx[goodIdx] == ipts)[0]
warnings.simplefilter('ignore', SparseEfficiencyWarning)
if pbM in sparsekinds:
for ij, j in enumerate(ptsBad):
nearest = pts[np.argmin(np.sum(np.abs(musMCNEff.data
- np.tile(musMCN[j], [len(pts), 1])
), axis = 1).flatten())]
self.data.coeffsEff[j][idxBad] = copy(
self.data.coeffsEff[nearest][idxBad])
self.data.polesEff[j][idxBad] = copy(
self.data.polesEff[nearest][idxBad])
else:
if (self.data._collapsed
or self.data.projMat.shape[1] == cEff.shape[1]):
cfBase = np.zeros((len(idxBad), cEff.shape[1]),
dtype = cEff.dtype)
else:
cfBase = csr_matrix((len(idxBad),
self.data.coeffsEff[0].shape[1]),
dtype = cEff.dtype)
valMuMBad = p(musMCN[ptsBad])
for ijb, jb in enumerate(ptsBad):
self.data.coeffsEff[jb][idxBad] = copy(cfBase)
self.data.polesEff[jb][idxBad] = 0.
for ij, j in enumerate(pts):
val = valMuMBad[ij][ijb]
if not np.isclose(val, 0., atol = 1e-15):
self.data.coeffsEff[jb][idxBad] += (val
* self.data.coeffsEff[j][idxBad])
self.data.polesEff[jb][idxBad] += (val
* self.data.polesEff[j][idxBad])
warnings.filters.pop(0)
if pbM in ppb + rbpb:
approx.paramsMarginal["MMarginal"] = _MMarginalEff
vbMng(self, "DEL", "Done computing marginal interpolator.", 12)
def updateEffectiveSamples(self, exclude:List[int], *args, **kwargs):
if hasattr(self, "_idxExcl"):
for j, excl in enumerate(self._idxExcl):
self.data.HIs.insert(excl, self._HIsExcl[j])
TrainedModelPivotedRationalNoMatch.updateEffectiveSamples(self,
exclude)
self._HIsExcl = []
for excl in self._idxExcl[::-1]:
self._HIsExcl = [self.data.HIs.pop(excl)] + self._HIsExcl
poles = [hi.poles for hi in self.data.HIs]
coeffs = [hi.coeffs for hi in self.data.HIs]
self.initializeFromLists(poles, coeffs, self.data.Psupp,
self.data.HIs[0].polybasis, *args, **kwargs)
def initializeFromRational(self, *args, **kwargs):
"""Initialize Heaviside representation."""
RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters")
poles, coeffs = [], []
for Q, P in zip(self.data.Qs, self.data.Ps):
cfs, pls, basis = rational2heaviside(P, Q)
poles += [pls]
coeffs += [cfs]
self.initializeFromLists(poles, coeffs, self.data.Psupp, basis, *args,
**kwargs)
def initializeFromLists(self, poles:ListAny, coeffs:ListAny, supps:ListAny,
basis:str, matchingWeight:float, HFEngine:HFEng,
is_state:bool):
"""Initialize Heaviside representation."""
poles, coeffs = heavisideUniformShape(poles, coeffs)
poles, coeffs = rationalFunctionMatching(poles, coeffs,
self.data.musMarginal.data,
matchingWeight, supps,
self.data.projMat, HFEngine,
is_state, None)
self.data.HIs = []
for pls, cfs in zip(poles, coeffs):
hsi = HI()
hsi.poles = pls
if len(cfs) == len(pls):
cfs = np.pad(cfs, ((0, 1), (0, 0)), "constant")
hsi.coeffs = cfs
hsi.npar = 1
hsi.polybasis = basis
self.data.HIs += [hsi]
self.data.suppEffPts = [np.arange(len(self.data.HIs))]
self.data.suppEffIdx = np.zeros(len(poles[0]), dtype = int)
def checkShared(self, shared:float, correction : str = "ERASE") -> str:
N = len(self.data.HIs[0].poles)
M = len(self.data.HIs)
correction = correction.upper().strip().replace(" ","")
if correction not in ["ERASE", "RATIONAL", "POLYNOMIAL"]:
RROMPyWarning(("Correction kind not recognized. Overriding to "
"'ERASE'."))
correction = "ERASE"
goodLocPoles = np.array([np.isinf(hi.poles) == False
for hi in self.data.HIs])
self.data.suppEffPts = [np.arange(len(self.data.HIs))]
self.data.suppEffIdx = - np.ones(N, dtype = int)
goodGlobPoles = np.sum(goodLocPoles, axis = 0)
goodEnoughPoles = goodGlobPoles >= max(1., 1. * shared * M)
keepPole = np.where(goodEnoughPoles)[0]
halfPole = np.where(goodEnoughPoles * (goodGlobPoles < M))[0]
self.data.suppEffIdx[keepPole] = 0
for idxR in halfPole:
pts = np.where(goodLocPoles[:, idxR])[0]
idxEff = len(self.data.suppEffPts)
for idEff, prevPts in enumerate(self.data.suppEffPts):
if len(prevPts) == len(pts):
if np.allclose(prevPts, pts):
idxEff = idEff
break
if idxEff == len(self.data.suppEffPts):
self.data.suppEffPts += [pts]
self.data.suppEffIdx[idxR] = idxEff
degBad = len(self.data.HIs[0].coeffs) - N - 1
for pt in range(len(self.data.HIs)):
idxR = np.where(goodLocPoles[pt] * (goodEnoughPoles == False))[0]
self.removePoleResLocal(idxR, pt, degBad, correction, True)
return ("Hard-erased {} pole".format(N - len(keepPole))
+ "s" * (N - len(keepPole) != 1)
+ " and soft-erased {} pole".format(len(halfPole))
+ "s" * (len(halfPole) != 1) + ".")
def removePoleResLocal(self, badidx:List[int], margidx:int,
degcorr : int = None, correction : str = "ERASE",
hidden : bool = False):
if not hasattr(badidx, "__len__"): badidx = [badidx]
badidx = np.array(badidx)
if len(badidx) == 0: return
correction = correction.upper().strip().replace(" ","")
if correction not in ["ERASE", "RATIONAL", "POLYNOMIAL"]:
RROMPyWarning(("Correction kind not recognized. Overriding to "
"'ERASE'."))
correction = "ERASE"
if hidden:
N = len(self.data.HIs[margidx].poles)
else:
N = len(self.data.polesEff[margidx])
goodidx = [j for j in range(N) if j not in badidx]
if correction != "ERASE":
if degcorr is None:
if hidden:
degcorr = len(self.data.HIs[margidx].coeffs) - N - 1
else:
degcorr = self.data.coeffsEff[margidx].shape[0] - N - 1
muM, musEff = self.data.musMarginal[margidx], []
polybasis = self.data.HIs[margidx].polybasis
for mu in self.data.mus:
if np.allclose(mu(self.data.directionMarginal), muM):
musEff += [mu(self.data.directionPivot[0])]
musEff = self.centerNormalizePivot(musEff)
if hidden:
plsBad = self.data.HIs[margidx].poles[badidx]
else:
plsBad = self.data.polesEff[margidx][badidx, 0]
plsBadEff = np.isinf(plsBad) == False
plsBad, badidx = plsBad[plsBadEff], badidx[plsBadEff]
if hidden:
plsGood = self.data.HIs[margidx].poles[goodidx]
corrVals = heavival(musEff,
self.data.HIs[margidx].coeffs[badidx],
plsBad, polybasis).T
else:
plsGood = self.data.polesEff[margidx][goodidx]
corrVals = heavival(musEff,
self.data.coeffsEff[margidx].toarray()[badidx],
plsBad, polybasis).T
if correction == "RATIONAL":
hi = HI()
hi.setupByInterpolation(musEff, plsGood, corrVals, degcorr,
polybasis)
if hidden:
self.data.HIs[margidx].coeffs[goodidx] += (
hi.coeffs[: len(goodidx)])
else:
self.data.coeffsEff[margidx][goodidx, :] += (
hi.coeffs[: len(goodidx)])
polyCorr = hi.coeffs[len(goodidx) :]
elif correction == "POLYNOMIAL":
pi = PI()
pi.setupByInterpolation(musEff, corrVals, degcorr,
polybasis.split("_")[0])
polyCorr = pi.coeffs
if hidden:
self.data.HIs[margidx].coeffs[N : N + degcorr + 1] += polyCorr
else:
self.data.coeffsEff[margidx][N : N + degcorr + 1, :] += (
polyCorr)
if hidden:
self.data.HIs[margidx].poles[badidx] = np.inf
self.data.HIs[margidx].coeffs[badidx] = 0.
else:
self.data.polesEff[margidx] = self.data.polesEff[margidx][goodidx]
goodidx += list(range(N, self.data.coeffsEff[margidx].shape[0]))
self.data.coeffsEff[margidx] = (
self.data.coeffsEff[margidx][goodidx, :])
def removePoleResGlobal(self, badidx:List[int], degcorr : int = None,
correction : str = "ERASE", hidden : bool = False):
if not hasattr(badidx, "__len__"): badidx = [badidx]
if len(badidx) == 0: return
correction = correction.upper().strip().replace(" ","")
if correction not in ["ERASE", "RATIONAL", "POLYNOMIAL"]:
RROMPyWarning(("Correction kind not recognized. Overriding to "
"'ERASE'."))
correction = "ERASE"
for margidx in range(len(self.data.HIs)):
self.removePoleResLocal(badidx, margidx, degcorr, correction,
hidden)
def getApproxReduced(self, mu : paramList = []) -> sampList:
"""
Evaluate reduced representation of approximant at arbitrary parameter.
Args:
mu: Target parameter.
"""
RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters")
mu = self.checkParameterList(mu)
if (not hasattr(self, "lastSolvedApproxReduced")
or self.lastSolvedApproxReduced != mu):
vbMng(self, "INIT",
"Evaluating approximant at mu = {}.".format(mu), 12)
muP = self.centerNormalizePivot(mu(self.data.directionPivot))
muM = mu(self.data.directionMarginal)
his = self.interpolateMarginalInterpolator(muM)
for i, (mP, hi) in enumerate(zip(muP, his)):
uAppR = hi(mP)[:, 0]
if i == 0:
uApproxR = np.empty((len(uAppR), len(mu)),
dtype = uAppR.dtype)
uApproxR[:, i] = uAppR
self.uApproxReduced = sampleList(uApproxR)
vbMng(self, "DEL", "Done evaluating approximant.", 12)
self.lastSolvedApproxReduced = mu
return self.uApproxReduced
def interpolateMarginalInterpolator(self, mu : paramList = []) -> ListAny:
"""Obtain interpolated approximant interpolator."""
mu = self.checkParameterListMarginal(mu)
vbMng(self, "INIT",
"Interpolating marginal models at mu = {}.".format(mu), 95)
his = []
muC = self.centerNormalizeMarginal(mu)
mIvals = self.data.marginalInterp(muC)
verb, self.verbosity = self.verbosity, 0
poless = self.interpolateMarginalPoles(mu, mIvals)
coeffss = self.interpolateMarginalCoeffs(mu, mIvals)
self.verbosity = verb
for j in range(len(mu)):
his += [HI()]
his[-1].poles = poless[j]
his[-1].coeffs = coeffss[j]
his[-1].npar = 1
his[-1].polybasis = self.data.HIs[0].polybasis
vbMng(self, "DEL", "Done interpolating marginal models.", 95)
return his
def interpolateMarginalPoles(self, mu : paramList = [],
mIvals : Np2D = None) -> ListAny:
"""Obtain interpolated approximant poles."""
mu = self.checkParameterListMarginal(mu)
vbMng(self, "INIT",
"Interpolating marginal poles at mu = {}.".format(mu), 95)
intMPoles = np.zeros((len(mu),) + self.data.polesEff[0].shape,
dtype = self.data.polesEff[0].dtype)
if mIvals is None:
muC = self.centerNormalizeMarginal(mu)
mIvals = self.data.marginalInterp(muC)
for pEff, mI in zip(self.data.polesEff, mIvals):
for j, m in enumerate(mI): intMPoles[j] += m * pEff
vbMng(self, "DEL", "Done interpolating marginal poles.", 95)
return intMPoles[..., 0]
def interpolateMarginalCoeffs(self, mu : paramList = [],
mIvals : Np2D = None) -> ListAny:
"""Obtain interpolated approximant coefficients."""
mu = self.checkParameterListMarginal(mu)
vbMng(self, "INIT",
"Interpolating marginal coefficients at mu = {}.".format(mu), 95)
intMCoeffs = np.zeros((len(mu),) + self.data.coeffsEff[0].shape,
dtype = self.data.coeffsEff[0].dtype)
if mIvals is None:
muC = self.centerNormalizeMarginal(mu)
mIvals = self.data.marginalInterp(muC)
for cEff, mI in zip(self.data.coeffsEff, mIvals):
for j, m in enumerate(mI): intMCoeffs[j] += m * cEff
vbMng(self, "DEL", "Done interpolating marginal coefficients.", 95)
return intMCoeffs
def getPVal(self, mu : paramList = []) -> sampList:
"""
Evaluate rational numerator at arbitrary parameter.
Args:
mu: Target parameter.
"""
RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters")
mu = self.checkParameterList(mu)
p = emptySampleList()
muP = self.centerNormalizePivot(mu(self.data.directionPivot))
muM = mu(self.data.directionMarginal)
his = self.interpolateMarginalInterpolator(muM)
for i, (mP, hi) in enumerate(zip(muP, his)):
Pval = hi(mP) * np.prod(mP[0] - hi.poles)
if i == 0: p.reset((len(Pval), len(mu)), dtype = Pval.dtype)
p[i] = Pval
return p
def getQVal(self, mu:Np1D, der : List[int] = None,
scl : Np1D = None) -> Np1D:
"""
Evaluate rational denominator at arbitrary parameter.
Args:
mu: Target parameter.
der(optional): Derivatives to take before evaluation.
"""
RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters")
mu = self.checkParameterList(mu)
muP = self.centerNormalizePivot(mu(self.data.directionPivot))
muM = mu(self.data.directionMarginal)
if der is None:
derP, derM = 0, [0]
else:
derP = der[self.data.directionPivot[0]]
derM = [der[x] for x in self.data.directionMarginal]
if np.any(np.array(derM) != 0):
raise RROMPyException(("Derivatives of Q with respect to marginal "
"parameters not allowed."))
sclP = 1 if scl is None else scl[self.data.directionPivot[0]]
derVal = np.zeros(len(mu), dtype = np.complex)
pls = self.interpolateMarginalPoles(muM)
for i, (mP, pl) in enumerate(zip(muP, pls)):
N = len(pl)
if derP == N: derVal[i] = 1.
elif derP >= 0 and derP < N:
plDist = mP[0] - pl
for terms in combinations(np.arange(N), N - derP):
derVal[i] += np.prod(plDist[list(terms)])
return sclP ** derP * fact(derP) * derVal
def getPoles(self, marginalVals : ListAny = [fp]) -> paramList:
"""
Obtain approximant poles.
Returns:
Numpy complex vector of poles.
"""
RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters")
mVals = list(marginalVals)
rDim = mVals.index(fp)
if rDim < len(mVals) - 1 and fp in mVals[rDim + 1 :]:
raise RROMPyException(("Exactly 1 'freepar' entry in "
"marginalVals must be provided."))
if rDim != self.data.directionPivot[0]:
raise RROMPyException(("'freepar' entry in marginalVals must "
"coincide with pivot direction."))
mVals[rDim] = self.data.mu0(rDim)[0]
mMarg = [mVals[j] for j in range(len(mVals)) if j != rDim]
roots = (self.data.scaleFactor[rDim]
* self.interpolateMarginalPoles(mMarg)[0])
return self.mapParameterList(self.mapParameterList(self.data.mu0(rDim),
idx = [rDim])(0, 0)
+ roots, "B", [rDim])(0)
def getResidues(self, marginalVals : ListAny = [fp]) -> Tuple[paramList,
Np2D]:
"""
Obtain approximant residues.
Returns:
Numpy matrix with residues as columns.
"""
mVals = list(marginalVals)
pls = self.getPoles(mVals)
rDim = mVals.index(fp)
mMarg = [mVals[j] for j in range(len(mVals)) if j != rDim]
res = self.interpolateMarginalCoeffs(mMarg)[0][: len(pls), :].T
if not self.data._collapsed: res = dot(self.data.projMat, res).T
return pls, res

Event Timeline