Page MenuHomec4science

trained_model_pivoted_general_pole_matching.py
No OneTemporary

File Metadata

Created
Thu, May 16, 20:34

trained_model_pivoted_general_pole_matching.py

# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see <http://www.gnu.org/licenses/>.
#
import numpy as np
from .trained_model import TrainedModel
from rrompy.utilities.base.types import (Np1D, Tuple, ListAny, paramVal,
paramList, sampList, HFEng)
from rrompy.utilities.base import verbosityManager as vbMng, freepar as fp
from rrompy.utilities.numerical import pointMatching
from rrompy.utilities.poly_fitting.heaviside import HeavisideInterpolator as HI
from rrompy.utilities.exception_manager import RROMPyException, RROMPyAssert
from rrompy.parameter import checkParameter, checkParameterList
from rrompy.sampling import emptySampleList
__all__ = ['TrainedModelPivotedGeneralPoleMatching']
class TrainedModelPivotedGeneralPoleMatching(TrainedModel):
"""
ROM approximant evaluation for pivoted approximants with pole matching.
Attributes:
Data: dictionary with all that can be pickled.
"""
def initializeFromLists(self, poles:ListAny, coeffs:ListAny, basis:str,
HFEngine:HFEng, matchingWeight : float = 1.,
POD : bool = True):
"""Initialize Heaviside representation."""
musM = self.data.musMarginal
margAbsDist = np.sum(np.abs(np.tile(musM.data.reshape(len(musM),1,-1),
[1, len(musM), 1])
- np.tile(musM.data.reshape(1,len(musM),-1),
[len(musM), 1, 1])), axis = 2)
N = len(poles[0])
explored = [0]
unexplored = list(range(1, len(musM)))
for _ in range(1, len(musM)):
minIdx = np.argmin(np.concatenate([margAbsDist[ex, unexplored] \
for ex in explored]))
minIex = explored[minIdx // len(unexplored)]
minIunex = unexplored[minIdx % len(unexplored)]
dist = np.abs(np.tile(poles[minIex].reshape(-1, 1), N)
- poles[minIunex].reshape(1, -1))
if matchingWeight != 0:
resex = coeffs[minIex][: N]
resunex = coeffs[minIunex][: N]
if POD:
distR = resex.dot(resunex.T.conj())
distR = (distR.T / np.linalg.norm(resex, axis = 1)).T
distR = distR / np.linalg.norm(resunex, axis = 1)
else:
resex = self.data.projMat.dot(resex.T)
resunex = self.data.projMat.dot(resunex.T)
distR = HFEngine.innerProduct(resex, resunex).T
distR = (distR.T / HFEngine.norm(resex)).T
distR = distR / HFEngine.norm(resunex)
distR = np.abs(distR)
distR[distR > 1.] = 1.
dist += 2. / np.pi * matchingWeight * np.arccos(distR)
reordering = pointMatching(dist, verbObj = self)
poles[minIunex] = poles[minIunex][reordering]
coeffs[minIunex][: N] = coeffs[minIunex][reordering]
explored += [minIunex]
unexplored.remove(minIunex)
# explored = np.append(explored, minIunex)
# unexplored = unexplored[unexplored != minIunex]
HIs = []
for pls, cfs in zip(poles, coeffs):
hsi = HI()
hsi.poles = pls
hsi.coeffs = cfs
hsi.npar = 1
hsi.polybasis = basis
HIs += [hsi]
self.data.HIs = HIs
def recompressByCutOff(self, murange : Tuple[float, float] = [- 1., 1.],
tol : float = np.inf, rtype : str = "MAGNITUDE"):
if np.isinf(tol): return " No poles erased."
N = len(self.data.HIs[0].poles)
mu0 = np.mean(murange)
musig = murange[0] - mu0
if np.isclose(musig, 0.):
radius = lambda x: np.abs(x - mu0)
else:
if rtype == "MAGNITUDE":
murdir = (murange[0] - mu0) / np.abs(musig)
def radius(x):
scalprod = (x - mu0) * murdir.conj() / np.abs(musig)
rescalepar = np.abs(np.real(scalprod))
rescaleort = np.abs(np.imag(scalprod))
return ((rescalepar - 1.) ** 2. * (rescalepar > 1.)
+ rescaleort ** 2.) ** .5
else:#if rtype == "POTENTIAL":
def radius(x):
rescale = (x - mu0) / musig
return np.max(np.abs(rescale * np.array([-1., 1.])
+ (rescale ** 2. - 1) ** .5)) - 1.
keepPole, removePole = [], []
for j in range(N):
for hi in self.data.HIs:
if radius(hi.poles[j]) <= tol:
keepPole += [j]
break
if len(keepPole) == 0 or keepPole[-1] != j: removePole += [j]
if len(keepPole) == N: return " No poles erased."
keepCoeff = keepPole + [N]
keepCoeff = keepCoeff + list(range(N + 1,len(self.data.HIs[0].coeffs)))
for hi in self.data.HIs:
polyCorrection = np.zeros_like(hi.coeffs[0, :])
for j in removePole:
polyCorrection += hi.coeffs[j, :] / (mu0 - hi.poles[j])
if len(hi.coeffs) == N:
hi.coeffs = np.vstack((hi.coeffs, polyCorrection))
else:
hi.coeffs[N, :] += polyCorrection
hi.poles = hi.poles[keepPole]
hi.coeffs = hi.coeffs[keepCoeff, :]
return (" Erased {} pole".format(len(removePole))
+ "s" * (len(removePole) > 1) + ".")
def interpolateMarginalInterpolator(self, mu : paramVal = []) -> Np1D:
"""Obtain interpolated approximant interpolator."""
mu = checkParameter(mu, self.data.nparMarginal)[0]
hsi = HI()
hsi.poles = self.interpolateMarginalPoles(mu)
hsi.coeffs = self.interpolateMarginalCoeffs(mu)
hsi.npar = 1
hsi.polybasis = self.data.HIs[0].polybasis
return hsi
def interpolateMarginalPoles(self, mu : paramList = []) -> Np1D:
"""Obtain interpolated approximant poles."""
mu = checkParameterList(mu, self.data.nparMarginal)[0]
return self.interpolateMarginal(mu, [hi.poles for hi in self.data.HIs])
def interpolateMarginalCoeffs(self, mu : paramList = []) -> Np1D:
"""Obtain interpolated approximant coefficients."""
mu = checkParameterList(mu, self.data.nparMarginal)[0]
cs = self.interpolateMarginal(mu, [hi.coeffs for hi in self.data.HIs])
if isinstance(cs, (list, tuple,)): cs = np.array(cs)
return cs.reshape(self.data.HIs[0].coeffs.shape)
def getApproxReduced(self, mu : paramList = []) -> sampList:
"""
Evaluate reduced representation of approximant at arbitrary parameter.
Args:
mu: Target parameter.
"""
RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters")
mu = checkParameterList(mu, self.data.npar)[0]
if (not hasattr(self, "lastSolvedApproxReduced")
or self.lastSolvedApproxReduced != mu):
vbMng(self, "INIT",
"Evaluating approximant at mu = {}.".format(mu), 12)
self.uApproxReduced = emptySampleList()
self.uApproxReduced.reset((self.data.HIs[0].coeffs.shape[1],
len(mu)), self.data.projMat[0].dtype)
for i, muPL in enumerate(mu):
muL = self.centerNormalizePivot([muPL(0, x) \
for x in self.data.directionPivot])
muM = [muPL(0, x) for x in self.data.directionMarginal]
vbMng(self, "INIT",
"Assembling reduced model for mu = {}.".format(muPL), 87)
hsL = self.interpolateMarginalInterpolator(muM)
vbMng(self, "DEL", "Done assembling reduced model.", 87)
self.uApproxReduced[i] = hsL(muL)
vbMng(self, "DEL", "Done evaluating approximant.", 12)
self.lastSolvedApproxReduced = mu
return self.uApproxReduced
def getPoles(self, *args, **kwargs) -> Np1D:
"""
Obtain approximant poles.
Returns:
Numpy complex vector of poles.
"""
RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters")
if len(args) + len(kwargs) > 1:
raise RROMPyException(("Wrong number of parameters passed. "
"Only 1 available."))
elif len(args) + len(kwargs) == 1:
if len(args) == 1:
mVals = args[0]
else:
mVals = kwargs["marginalVals"]
if not hasattr(mVals, "__len__"): mVals = [mVals]
mVals = list(mVals)
else:
mVals = [fp]
try:
rDim = mVals.index(fp)
if rDim < len(mVals) - 1 and fp in mVals[rDim + 1 :]:
raise
except:
raise RROMPyException(("Exactly 1 'freepar' entry in "
"marginalVals must be provided."))
if rDim != self.data.directionPivot[0]:
raise RROMPyException(("'freepar' entry in marginalVals must "
"coincide with pivot direction."))
mVals[rDim] = self.data.mu0(rDim)
mMarg = [mVals[j] for j in range(len(mVals)) if j != rDim]
roots = np.array(self.interpolateMarginalPoles(mMarg))
return np.power(self.data.mu0(rDim) ** self.data.rescalingExp[rDim]
+ self.data.scaleFactor[rDim] * roots,
1. / self.data.rescalingExp[rDim])
def getResidues(self, *args, **kwargs) -> Np1D:
"""
Obtain approximant residues.
Returns:
Numpy matrix with residues as columns.
"""
pls = self.getPoles(*args, **kwargs)
if len(args) == 1:
mVals = args[0]
else:
mVals = kwargs["marginalVals"]
if not hasattr(mVals, "__len__"): mVals = [mVals]
mVals = list(mVals)
rDim = mVals.index(fp)
mMarg = [mVals[j] for j in range(len(mVals)) if j != rDim]
residues = self.interpolateMarginalCoeffs(mMarg)[: len(pls)]
res = self.data.projMat.dot(residues)
return pls, res

Event Timeline