Page MenuHomec4science

trained_model_pivoted.py
No OneTemporary

File Metadata

Created
Thu, Jun 6, 19:33

trained_model_pivoted.py

# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see <http://www.gnu.org/licenses/>.
#
import numpy as np
from scipy.special import factorial as fact
from copy import deepcopy as copy
from itertools import combinations
from rrompy.reduction_methods.standard.trained_model import (
TrainedModelRational)
from rrompy.utilities.base.types import (Np1D, Np2D, Tuple, List, ListAny,
paramVal, paramList, sampList, HFEng)
from rrompy.utilities.base import verbosityManager as vbMng, freepar as fp
from rrompy.utilities.numerical import (pointMatching, chordalMetricAdjusted,
cutOffDistance, reduceDegreeN)
from rrompy.utilities.poly_fitting.polynomial import (polybases as ppb,
PolynomialInterpolator as PI)
from rrompy.utilities.poly_fitting.radial_basis import (polybases as rbpb,
RadialBasisInterpolator as RBI)
from rrompy.utilities.poly_fitting.moving_least_squares import (
MovingLeastSquaresInterpolator as MLSI)
from rrompy.utilities.poly_fitting.heaviside import (heavisideUniformShape,
rational2heaviside,
HeavisideInterpolator as HI)
from rrompy.utilities.exception_manager import (RROMPyException, RROMPyAssert,
RROMPyWarning)
from rrompy.parameter import checkParameter, checkParameterList
from rrompy.sampling import emptySampleList
__all__ = ['TrainedModelPivoted']
class TrainedModelPivoted(TrainedModelRational):
"""
ROM approximant evaluation for pivoted approximants (with pole matching).
Attributes:
Data: dictionary with all that can be pickled.
"""
def centerNormalizePivot(self, mu : paramList = [],
mu0 : paramVal = None) -> paramList:
"""
Compute normalized parameter to be plugged into approximant.
Args:
mu: Parameter(s) 1.
mu0: Parameter(s) 2. If None, set to self.data.mu0Pivot.
Returns:
Normalized parameter.
"""
mu = checkParameterList(mu, self.data.nparPivot)[0]
if mu0 is None: mu0 = self.data.mu0Pivot
rad = ((mu ** self.data.rescalingExpPivot
- mu0 ** self.data.rescalingExpPivot)
/ self.data.scaleFactorPivot)
return rad
def centerNormalizeMarginal(self, mu : paramList = [],
mu0 : paramVal = None) -> paramList:
"""
Compute normalized parameter to be plugged into approximant.
Args:
mu: Parameter(s) 1.
mu0: Parameter(s) 2. If None, set to self.data.mu0Marginal.
Returns:
Normalized parameter.
"""
mu = checkParameterList(mu, self.data.nparMarginal)[0]
if mu0 is None: mu0 = self.data.mu0Marginal
rad = ((mu ** self.data.rescalingExpMarginal
- mu0 ** self.data.rescalingExpMarginal)
/ self.data.scaleFactorMarginal)
return rad
def updateEffectiveSamples(self, HFEngine:HFEng,
exclude : List[int] = None,
matchingWeight : float = 1., POD : bool = True,
is_state : bool = True):
if hasattr(self, "_idxExcl"):
for j, excl in enumerate(self._idxExcl):
self.data.musMarginal.insert(self._musMExcl[j], excl)
self.data.HIs.insert(excl, self._HIsExcl[j])
self.data.Ps.insert(excl, self._PsExcl[j])
self.data.Qs.insert(excl, self._QsExcl[j])
if exclude is None: exclude = []
self._idxExcl, self._musMExcl = list(np.sort(exclude)), []
self._HIsExcl, self._PsExcl, self._QsExcl = [], [], []
for excl in self._idxExcl[::-1]:
self._musMExcl = [self.data.musMarginal[excl]] + self._musMExcl
self.data.musMarginal.pop(excl)
self._HIsExcl = [self.data.HIs.pop(excl)] + self._HIsExcl
self._PsExcl = [self.data.Ps.pop(excl)] + self._PsExcl
self._QsExcl = [self.data.Qs.pop(excl)] + self._QsExcl
poles = [hi.poles for hi in self.data.HIs]
coeffs = [hi.coeffs for hi in self.data.HIs]
self.initializeFromLists(poles, coeffs, self.data.HIs[0].polybasis,
HFEngine, matchingWeight, POD, is_state)
def setupMarginalInterp(self, approx, interpPars:ListAny,
MMAuto:bool, rDWM : Np1D = None,
noWarnReduceAuto : bool = True):
vbMng(self, "INIT", "Starting computation of marginal interpolator.",
7)
musMCN = self.centerNormalizeMarginal(self.data.musMarginal)
pbM = approx.polybasisMarginal
if pbM not in ppb:
rDWMEf = np.array(rDWM)
self.data.marginalInterp = []
for ipts, pts in enumerate(self.data.suppEffPts):
mI = [None] * len(musMCN)
if len(pts) > 0:
musMCNEff = musMCN[pts]
if MMAuto:
if ipts > 0:
verb = approx.verbosity
approx.verbosity = 0
_musM = approx.musMarginal
approx.musMarginal = musMCNEff
approx._setMMarginalAuto()
if ipts > 0:
approx.musMarginal = _musM
approx.verbosity = verb
else:
MM = reduceDegreeN(approx.MMarginal, len(musMCNEff),
self.data.nparMarginal,
approx.polydegreetypeMarginal)
if MM < approx.MMarginal:
if ipts == 0 and not noWarnReduceAuto:
RROMPyWarning(("MMarginal too large compared to "
"SMarginal. Reducing MMarginal by "
"{}").format(approx.MMarginal - MM))
approx.MMarginal = MM
MMEff = approx.MMarginal
for j in range(len(musMCNEff)):
canonicalj = 1. * (np.arange(len(musMCNEff)) == j)
while MMEff >= 0 and (pbM in ppb
or rDWMEf[0] <= rDWM[0] * 2 ** 6):
pParRest = copy(interpPars)
if pbM in ppb:
p = PI()
else:
pParRest = [rDWMEf] + pParRest
p = RBI() if pbM in rbpb else MLSI()
wellCond, msg = p.setupByInterpolation(musMCNEff,
canonicalj, MMEff,
pbM, *pParRest)
vbMng(self, "MAIN", msg, 5)
if wellCond: break
if pbM in ppb:
vbMng(self, "MAIN",
("Polyfit is poorly conditioned. Reducing "
"MMarginal by 1."), 10)
MMEff -= 1
else:
vbMng(self, "MAIN",
("Polyfit is poorly conditioned. "
"Multiplying radialDirectionalWeightsMarginal "
"by 2."), 10)
rDWMEf *= 2.
if MMEff < 0 or (pbM not in ppb
and rDWMEf[0] > rDWM[0] * 2 ** 6):
raise RROMPyException(("Instability in computation of "
"interpolant. Aborting."))
if pbM in ppb: MMEff = approx.MMarginal
else: rDWMEf = np.array(rDWM)
mI[pts[j]] = copy(p)
self.data.marginalInterp += [mI]
_MMarginalEffective = approx.MMarginal
approx.MMarginal = _MMarginalEffective
vbMng(self, "DEL", "Done computing marginal interpolator.", 7)
def initializeFromLists(self, poles:ListAny, coeffs:ListAny, basis:str,
HFEngine:HFEng, matchingWeight : float = 1.,
POD : bool = True, is_state : bool = True):
"""Initialize Heaviside representation."""
musM = self.data.musMarginal
margAbsDist = np.sum(np.abs(np.repeat(musM.data, len(musM), 0)
- np.tile(musM.data, [len(musM), 1])
), axis = 1).reshape(len(musM), len(musM))
explored = [0]
unexplored = list(range(1, len(musM)))
poles, coeffs = heavisideUniformShape(poles, coeffs)
N = len(poles[0])
for _ in range(1, len(musM)):
minIdx = np.argmin(np.concatenate([margAbsDist[ex, unexplored] \
for ex in explored]))
minIex = explored[minIdx // len(unexplored)]
minIunex = unexplored[minIdx % len(unexplored)]
resex = coeffs[minIex][: N]
resunex = coeffs[minIunex][: N]
if matchingWeight != 0 and not POD:
resex = self.data.projMat.dot(resex.T)
resunex = self.data.projMat.dot(resunex.T)
dist = chordalMetricAdjusted(poles[minIex], poles[minIunex],
matchingWeight, resex, resunex,
HFEngine, is_state)
reordering = pointMatching(dist)[1]
poles[minIunex] = poles[minIunex][reordering]
coeffs[minIunex][: N] = coeffs[minIunex][reordering]
explored += [minIunex]
unexplored.remove(minIunex)
HIs = []
for pls, cfs in zip(poles, coeffs):
hsi = HI()
hsi.poles = pls
hsi.coeffs = cfs
hsi.npar = 1
hsi.polybasis = basis
HIs += [hsi]
self.data.HIs = HIs
self.data.suppEffPts = [np.arange(len(self.data.HIs))]
self.data.suppEffIdx = np.zeros(N, dtype = int)
def initializeFromRational(self, HFEngine:HFEng,
matchingWeight : float = 1., POD : bool = True,
is_state : bool = True):
"""Initialize Heaviside representation."""
RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters")
poles, coeffs = [], []
for Q, P in zip(self.data.Qs, self.data.Ps):
cfs, pls, basis = rational2heaviside(P, Q)
poles += [pls]
coeffs += [cfs]
self.initializeFromLists(poles, coeffs, basis, HFEngine,
matchingWeight, POD, is_state)
def recompressByCutOff(self, tol : float = np.inf, kind : str = "STANDARD",
murange : Tuple[float, float] = [- 1., 1.]) -> str:
N = len(self.data.HIs[0].poles)
mu0 = np.mean(murange)
goodLocPoles = np.array([cutOffDistance(hi.poles, murange) <= tol
for hi in self.data.HIs])
self.data.suppEffPts = [np.arange(len(self.data.HIs))]
self.data.suppEffIdx = np.zeros(N, dtype = int)
if np.all(goodLocPoles):
return " No poles erased."
kind = kind.upper().strip().replace(" ","")
goodAllPoles = np.all(goodLocPoles, axis = 0)
badPoles = np.logical_not(goodAllPoles)
if kind == "HARD":
keepPole = np.where(goodAllPoles)[0]
halfPole = np.empty(0, dtype = int)
removePole = np.where(badPoles)[0]
elif kind == "SOFT":
goodSomePoles = np.any(goodLocPoles, axis = 0)
keepPole = np.where(goodSomePoles)[0]
halfPole = np.where(np.logical_and(badPoles, goodSomePoles))[0]
removePole = np.where(np.logical_not(goodSomePoles))[0]
else:
raise RROMPyException("Cutoff kind not recognized.")
if len(removePole) > 0:
keepCoeff = np.append(keepPole, np.append([N],
np.arange(N + 1, len(self.data.HIs[0].coeffs))))
for hi in self.data.HIs:
polyCorrection = np.zeros_like(hi.coeffs[0, :])
for j in removePole:
if not np.isinf(hi.poles[j]):
polyCorrection += hi.coeffs[j, :] / (mu0 - hi.poles[j])
if len(hi.coeffs) == N:
hi.coeffs = np.vstack((hi.coeffs, polyCorrection))
else:
hi.coeffs[N, :] += polyCorrection
hi.poles = hi.poles[keepPole]
hi.coeffs = hi.coeffs[keepCoeff, :]
for idxR in halfPole:
pts = np.where(goodLocPoles[:, idxR])[0]
idxEff = len(self.data.suppEffPts)
for idEff, prevPts in enumerate(self.data.suppEffPts):
if len(prevPts) == len(pts):
if np.allclose(prevPts, pts):
idxEff = idEff
break
if idxEff == len(self.data.suppEffPts):
self.data.suppEffPts += [pts]
self.data.suppEffIdx[idxR] = idxEff
self.data.suppEffIdx = self.data.suppEffIdx[keepPole]
return (" Hard-erased {} pole".format(len(removePole))
+ "s" * (len(removePole) != 1)
+ " and soft-erased {} pole".format(len(halfPole))
+ "s" * (len(halfPole) != 1) + ".")
def _interpolateMarginal(self, muC : paramList, objs : ListAny) -> Np2D:
res = np.zeros(objs[0].shape + (len(muC),), dtype = objs[0].dtype)
for suppIdx in range(len(self.data.suppEffPts)):
i = np.where(self.data.suppEffIdx == suppIdx)[0]
if suppIdx == 0:
i = np.append(i, np.arange(len(self.data.suppEffIdx),
len(res)))
if len(i) > 0:
for mIj, obj in zip(self.data.marginalInterp[suppIdx], objs):
if mIj is not None:
res[i] += np.expand_dims(obj[i], - 1) * mIj(muC)
return res
def interpolateMarginalInterpolator(self, mu : paramVal = []) -> Np1D:
"""Obtain interpolated approximant interpolator."""
mu = checkParameter(mu, self.data.nparMarginal)[0]
hsi = HI()
hsi.poles = self.interpolateMarginalPoles(mu)[..., 0]
hsi.coeffs = self.interpolateMarginalCoeffs(mu)[..., 0]
hsi.npar = 1
hsi.polybasis = self.data.HIs[0].polybasis
return hsi
def interpolateMarginalPoles(self, mu : paramList = []) -> Np1D:
"""Obtain interpolated approximant poles."""
mu = checkParameterList(mu, self.data.nparMarginal)[0]
muC = self.centerNormalizeMarginal(mu)
vbMng(self, "INIT",
"Interpolating marginal poles at mu = {}.".format(mu), 95)
intMPoles = self._interpolateMarginal(muC,
[hi.poles for hi in self.data.HIs])
vbMng(self, "DEL", "Done interpolating marginal poles.", 95)
return intMPoles
def interpolateMarginalCoeffs(self, mu : paramList = []) -> Np1D:
"""Obtain interpolated approximant coefficients."""
mu = checkParameterList(mu, self.data.nparMarginal)[0]
muC = self.centerNormalizeMarginal(mu)
vbMng(self, "INIT",
"Interpolating marginal coefficients at mu = {}.".format(mu), 95)
intMCoeffs = self._interpolateMarginal(muC,
[hi.coeffs for hi in self.data.HIs])
vbMng(self, "DEL", "Done interpolating marginal coefficients.", 95)
return intMCoeffs
def getApproxReduced(self, mu : paramList = []) -> sampList:
"""
Evaluate reduced representation of approximant at arbitrary parameter.
Args:
mu: Target parameter.
"""
RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters")
mu = checkParameterList(mu, self.data.npar)[0]
if (not hasattr(self, "lastSolvedApproxReduced")
or self.lastSolvedApproxReduced != mu):
vbMng(self, "INIT",
"Evaluating approximant at mu = {}.".format(mu), 12)
self.uApproxReduced = emptySampleList()
for i, muPL in enumerate(mu):
muL = self.centerNormalizePivot([muPL(0, x) \
for x in self.data.directionPivot])
muM = [muPL(0, x) for x in self.data.directionMarginal]
vbMng(self, "INIT",
"Assembling reduced model for mu = {}.".format(muPL), 87)
hsL = self.interpolateMarginalInterpolator(muM)
vbMng(self, "DEL", "Done assembling reduced model.", 87)
uAppR = hsL(muL)
if i == 0:
self.uApproxReduced.reset((len(uAppR), len(mu)),
dtype = uAppR.dtype)
self.uApproxReduced[i] = uAppR
vbMng(self, "DEL", "Done evaluating approximant.", 12)
self.lastSolvedApproxReduced = mu
return self.uApproxReduced
def getPVal(self, mu : paramList = []) -> sampList:
"""
Evaluate rational numerator at arbitrary parameter.
Args:
mu: Target parameter.
"""
RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters")
mu = checkParameterList(mu, self.data.npar)[0]
p = emptySampleList()
p.reset((len(self.data.HIs[0].coeffs.shape[1]), len(mu)))
for i, muPL in enumerate(mu):
muL = self.centerNormalizePivot([muPL(0, x) \
for x in self.data.directionPivot])
muM = [muPL(0, x) for x in self.data.directionMarginal]
hsL = self.interpolateMarginalInterpolator(muM)
p[i] = hsL(muL) * np.prod(muL(0, 0) - hsL.poles)
return p
def getQVal(self, mu:Np1D, der : List[int] = None,
scl : Np1D = None) -> Np1D:
"""
Evaluate rational denominator at arbitrary parameter.
Args:
mu: Target parameter.
der(optional): Derivatives to take before evaluation.
"""
RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters")
mu = checkParameterList(mu, self.data.npar)[0]
muP = self.centerNormalizePivot(checkParameterList(
mu.data[:, self.data.directionPivot],
self.data.nparPivot)[0])
muM = checkParameterList(mu.data[:, self.data.directionMarginal],
self.data.nparMarginal)[0]
if der is None:
derP, derM = 0, [0]
else:
derP = der[self.data.directionPivot[0]]
derM = [der[x] for x in self.data.directionMarginal]
if np.any(np.array(derM) != 0):
raise RROMPyException(("Derivatives of Q with respect to marginal "
"parameters not allowed."))
sclP = 1 if scl is None else scl[self.data.directionPivot[0]]
derVal = np.zeros(len(mu), dtype = np.complex)
N = len(self.data.HIs[0].poles)
if derP == N: derVal[:] = 1.
elif derP >= 0 and derP < N:
pls = self.interpolateMarginalPoles(muM).T
plsDist = muP.data.reshape(-1, 1) - pls
for terms in combinations(np.arange(N), N - derP):
derVal += np.prod(plsDist[:, list(terms)], axis = 1)
return sclP ** derP * fact(derP) * derVal
def getPoles(self, *args, **kwargs) -> Np1D:
"""
Obtain approximant poles.
Returns:
Numpy complex vector of poles.
"""
RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters")
if len(args) + len(kwargs) > 1:
raise RROMPyException(("Wrong number of parameters passed. "
"Only 1 available."))
elif len(args) + len(kwargs) == 1:
if len(args) == 1:
mVals = args[0]
else:
mVals = kwargs["marginalVals"]
if not hasattr(mVals, "__len__"): mVals = [mVals]
mVals = list(mVals)
else:
mVals = [fp]
try:
rDim = mVals.index(fp)
if rDim < len(mVals) - 1 and fp in mVals[rDim + 1 :]:
raise
except:
raise RROMPyException(("Exactly 1 'freepar' entry in "
"marginalVals must be provided."))
if rDim != self.data.directionPivot[0]:
raise RROMPyException(("'freepar' entry in marginalVals must "
"coincide with pivot direction."))
mVals[rDim] = self.data.mu0(rDim)
mMarg = [mVals[j] for j in range(len(mVals)) if j != rDim]
roots = self.interpolateMarginalPoles(mMarg)[..., 0]
return np.power(self.data.mu0(rDim) ** self.data.rescalingExp[rDim]
+ self.data.scaleFactor[rDim] * roots,
1. / self.data.rescalingExp[rDim])
def getResidues(self, *args, **kwargs) -> Np1D:
"""
Obtain approximant residues.
Returns:
Numpy matrix with residues as columns.
"""
pls = self.getPoles(*args, **kwargs)
if len(args) == 1:
mVals = args[0]
elif len(args) == 0:
mVals = [None]
else:
mVals = kwargs["marginalVals"]
if not hasattr(mVals, "__len__"): mVals = [mVals]
mVals = list(mVals)
rDim = mVals.index(fp)
mMarg = [mVals[j] for j in range(len(mVals)) if j != rDim]
residues = self.interpolateMarginalCoeffs(mMarg)[: len(pls), :, 0]
res = self.data.projMat.dot(residues.T)
return pls, res

Event Timeline