diff --git a/examples/1_symmetric_disk/symmetric_disk.py b/examples/1_symmetric_disk/symmetric_disk.py
index 7cd9be6..0b57b65 100644
--- a/examples/1_symmetric_disk/symmetric_disk.py
+++ b/examples/1_symmetric_disk/symmetric_disk.py
@@ -1,86 +1,86 @@
import numpy as np
from symmetric_disk_engine import SymmetricDiskEngine as engine
from rrompy.sampling import SamplingEngineStandard as SES
from rrompy.reduction_methods import (
NearestNeighbor as NN, RationalInterpolant as RI, ReducedBasis as RB,
RationalInterpolantGreedy as RIG, ReducedBasisGreedy as RBG)
from rrompy.parameter.parameter_sampling import QuadratureSampler as QS
ks = [10., 20.]
k0, n = np.mean(np.power(ks, 2.)) ** .5, 150
solver = engine(k0, n)
k = 12.
for method in ["RI", "RB", "RI_GREEDY", "RB_GREEDY"]:
print("Testing {} method".format(method))
if method == "RI":
params = {'S':40, 'POD':True, 'polybasis':"CHEBYSHEV",
'sampler':QS(ks, "CHEBYSHEV", scalingExp = 2.)}
algo = RI
if method == "RB":
params = {'S':40, 'POD':True,
'sampler':QS(ks, "CHEBYSHEV", scalingExp = 2.)}
algo = RB
if method == "RI_GREEDY":
params = {'S':10, 'POD':True, 'polybasis':"LEGENDRE", 'greedyTol':1e-2,
'sampler':QS(ks, "UNIFORM", scalingExp = 2.),
'errorEstimatorKind':"DISCREPANCY",
'trainSetGenerator':QS(ks, "CHEBYSHEV", scalingExp = 2.)}
algo = RIG
if method == "RB_GREEDY":
params = {'S':10, 'POD':True, 'greedyTol':1e-2,
'sampler':QS(ks, "UNIFORM", scalingExp = 2.),
'trainSetGenerator':QS(ks, "CHEBYSHEV", scalingExp = 2.)}
algo = RBG
approx = algo(solver, mu0 = k0, approx_state = True,
approxParameters = params, verbosity = 20)
if len(method) == 2:
approx.setupApprox()
else:
approx.setupApprox("LAST")
print("--- Approximant ---")
approx.plotApprox(k, name = 'u_app')
approx.plotHF(k, name = 'u_HF')
approx.plotErr(k, name = 'err_app')
approx.plotRes(k, name = 'res_app')
normErr = approx.normErr(k)[0]
normSol = approx.normHF(k)[0]
normRes = approx.normRes(k)[0]
normRHS = approx.normRHS(k)[0]
print("SolNorm:\t{:.5e}\nErr_app: \t{:.5e}\nErrRel_app:\t{:.5e}".format(
normSol, normErr, normErr / normSol))
print("RHSNorm:\t{:.5e}\nRes_app: \t{:.5e}\nResRel_app:\t{:.5e}".format(
normRHS, normRes, normRes / normRHS))
print("--- Closest snapshot ---")
eng = SES(solver, verbosity = 0)
approxNN = NN(solver, mu0 = k0, approx_state = True, verbosity = 0,
approxParameters = {'S':approx.S, 'POD':True})
approxNN.setSamples(approx.samplingEngine)
approxNN.plotApprox(k, name = 'u_close')
approxNN.plotHF(k, name = 'u_HF')
approxNN.plotErr(k, name = 'err_close')
approxNN.plotRes(k, name = 'res_close')
normErr = approxNN.normErr(k)[0]
normSol = approxNN.normHF(k)[0]
normRes = approxNN.normRes(k)[0]
normRHS = approxNN.normRHS(k)[0]
print("SolNorm:\t{:.5e}\nErr_close:\t{:.5e}\nErrRel_close:\t{:.5e}".format(
normSol, normErr, normErr / normSol))
print("RHSNorm:\t{:.5e}\nRes_close:\t{:.5e}\nResRel_close:\t{:.5e}".format(
normRHS, normRes, normRes / normRHS))
if method[:2] == "RI":
poles, residues = approx.getResidues()
if method[:2] == "RB":
poles = approx.getPoles()
print("Poles:\n{}".format(poles))
if method[:2] == "RI":
- for pol, res in zip(poles, residues.T):
+ for pol, res in zip(poles, residues):
solver.plot(res)
print("pole = {:.5e}".format(pol))
print("\n")
diff --git a/examples/2_double_slit/double_slit.py b/examples/2_double_slit/double_slit.py
index 45e0136..cbd7840 100644
--- a/examples/2_double_slit/double_slit.py
+++ b/examples/2_double_slit/double_slit.py
@@ -1,83 +1,83 @@
import numpy as np
from double_slit_engine import DoubleSlitEngine as engine
from rrompy.sampling import SamplingEngineStandard as SES
from rrompy.reduction_methods import (NearestNeighbor as NN,
RationalInterpolant as RI,
RationalInterpolantGreedy as RIG)
from rrompy.parameter.parameter_sampling import QuadratureSampler as QS
from rrompy.solver.fenics import interp_project
ks = [10., 15.]
k0, n = np.mean(ks), 150
solver = engine(k0, n)
k = 11.
for method in ["RI", "RI_GREEDY"]:
print("Testing {} method".format(method))
if method == "RI":
params = {'S':20, 'POD':True, 'polybasis':"CHEBYSHEV",
'sampler':QS(ks, "CHEBYSHEV")}
algo = RI
if method == "RI_GREEDY":
params = {'S':10, 'POD':True, 'polybasis':"LEGENDRE", 'greedyTol':1e-2,
'sampler':QS(ks, "UNIFORM"),
'errorEstimatorKind':"LOOK_AHEAD",
'trainSetGenerator':QS(ks, "CHEBYSHEV")}
algo = RIG
approx = algo(solver, mu0 = k0, approx_state = True,
approxParameters = params, verbosity = 20)
if len(method) == 2:
approx.setupApprox()
else:
approx.setupApprox("LAST")
print("--- Approximant ---")
approx.plotApprox(k, name = 'u_app')
approx.plotHF(k, name = 'u_HF')
approx.plotErr(k, name = 'err_app')
approx.plotRes(k, name = 'res_app')
normErr = approx.normErr(k)[0]
normSol = approx.normHF(k)[0]
normRes = approx.normRes(k)[0]
normRHS = approx.normRHS(k)[0]
print("SolNorm:\t{:.5e}\nErr_app: \t{:.5e}\nErrRel_app:\t{:.5e}".format(
normSol, normErr, normErr / normSol))
print("RHSNorm:\t{:.5e}\nRes_app: \t{:.5e}\nResRel_app:\t{:.5e}".format(
normRHS, normRes, normRes / normRHS))
print("--- Closest snapshot ---")
eng = SES(solver, verbosity = 0)
approxNN = NN(solver, mu0 = k0, approx_state = True, verbosity = 0,
approxParameters = {'S':approx.S, 'POD':True})
approxNN.setSamples(approx.samplingEngine)
approxNN.plotApprox(k, name = 'u_close')
approxNN.plotHF(k, name = 'u_HF')
approxNN.plotErr(k, name = 'err_close')
approxNN.plotRes(k, name = 'res_close')
normErr = approxNN.normErr(k)[0]
normSol = approxNN.normHF(k)[0]
normRes = approxNN.normRes(k)[0]
normRHS = approxNN.normRHS(k)[0]
print("SolNorm:\t{:.5e}\nErr_close:\t{:.5e}\nErrRel_close:\t{:.5e}".format(
normSol, normErr, normErr / normSol))
print("RHSNorm:\t{:.5e}\nRes_close:\t{:.5e}\nResRel_close:\t{:.5e}".format(
normRHS, normRes, normRes / normRHS))
uIncR, uIncI = solver.getDirichletValues(k)
uIncR = interp_project(uIncR, solver.V)
uIncI = interp_project(uIncI, solver.V)
uInc = np.array(uIncR.vector()) + 1.j * np.array(uIncI.vector())
uEx = approx.getHF(k)[0] - uInc
uApp = approx.getApprox(k)[0] - uInc
solver.plot(uEx, name = 'uex_tot')
solver.plot(uApp, name = 'u_app_tot')
poles, residues = approx.getResidues()
print("Poles:\n{}".format(poles))
- for pol, res in zip(poles, residues.T):
+ for pol, res in zip(poles, residues):
solver.plot(res)
print("pole = {:.5e}".format(pol))
print("\n")
diff --git a/examples/6_boundary_value_problem_1D/boundary_value_problem_1D.py b/examples/6_boundary_value_problem_1D/boundary_value_problem_1D.py
index 6bedb03..0cf9f49 100644
--- a/examples/6_boundary_value_problem_1D/boundary_value_problem_1D.py
+++ b/examples/6_boundary_value_problem_1D/boundary_value_problem_1D.py
@@ -1,103 +1,103 @@
import warnings
import numpy as np
import matplotlib.pyplot as plt
from boundary_value_problem_1D_engine import BVP1DEngine as engine, BVP1DPoles
from rrompy.reduction_methods import (RationalInterpolant as RI,
RationalInterpolantGreedy as RIG)
from rrompy.parameter.parameter_sampling import (QuadratureBoxSampler as QBS,
QuadratureSampler as QS)
from rrompy.utilities.numerical import potential
ks, shift = [0., 5e3], 200.j
ksRatio = 2. * np.abs(shift) / np.abs(ks[1] - ks[0])
k0, gamma, n = np.mean(ks), .1, 10000
solver = engine(gamma, n, shift)
kEffMult = .1
kEff = np.real([ks[0] - kEffMult * (ks[1] - ks[0]),
ks[1] + kEffMult * (ks[1] - ks[0])])
methods = ["25", "30", "35", "10+", "15+"]
for kind in ["SEGMENT", "CLOUD"]:
print("Testing sampling kind {}...".format(kind))
errAbs, errRel = [], []
for method in methods:
print("S = {}".format(method))
if len(method) == 2:
if kind == "SEGMENT":
sampler = QS(ks, "CHEBYSHEV")
else: #if kind == "CLOUD":
sampler = QBS(ks, "CHEBYSHEV", ksRatio)
params = {'S':int(method), 'POD':True,
'polybasis':"CHEBYSHEV", 'sampler':sampler}
algo = RI
if method[-1] == "+":
if kind == "SEGMENT":
sampler = QS(ks, "UNIFORM")
trainSetGenerator = QS(ks, "CHEBYSHEV")
else: #if kind == "CLOUD":
sampler = QBS(ks, "UNIFORM", ksRatio)
trainSetGenerator = QBS(ks, "CHEBYSHEV", ksRatio)
params = {'S':int(method[: -1]), 'POD':True,
'polybasis':"LEGENDRE", 'greedyTol':1e-3,
'sampler':sampler, 'errorEstimatorKind':"LOOK_AHEAD",
'nTestPoints':1000,
'trainSetGenerator':trainSetGenerator}
algo = RIG
approx = algo(solver, mu0 = k0, approx_state = True,
approxParameters = params, verbosity = 0)
approx.setupApprox()
- poles, residues = approx.getResidues()
+ poles = approx.getPoles()
polesEff = poles[np.logical_and(np.real(poles) >= kEff[0],
np.real(poles) <= kEff[1])]
polesEx = BVP1DPoles(gamma, kEff[0], kEff[1], shift)
polesExTop = np.sort(polesEx[
np.logical_and(np.logical_and(np.real(polesEx) >= np.real(ks[0]),
np.real(polesEx) <= np.real(ks[1])),
np.imag(polesEx) > - np.imag(shift))])
polesExBot = np.sort(polesEx[
np.logical_and(np.logical_and(np.real(polesEx) >= np.real(ks[0]),
np.real(polesEx) <= np.real(ks[1])),
np.imag(polesEx) < - np.imag(shift))])
polesEx = np.append(np.append(polesExTop, np.inf), polesExBot)
polesExErr = np.abs(np.tile(polesEx.reshape(-1, 1), [1, len(poles)])
- poles.reshape(1,-1))
fig = plt.figure()
ax = fig.add_subplot(1, 1, 1)
if method[-1] == "+":
print("S_eff = {}".format(approx.S))
ax.plot(approx.muTest.re.data.flatten(),
approx.muTest.im.data.flatten(), 'k,', alpha = 0.25)
ax.plot(approx.mus.re.data.flatten(),
approx.mus.im.data.flatten(), 'k.')
ax.plot(np.real(polesEff), np.imag(polesEff), 'r+')
ax.plot(np.real(polesEx), np.imag(polesEx), 'bx')
ax.set_xlim(kEff)
ax.grid()
plt.tight_layout()
plt.show()
errAbs += [np.min(polesExErr, axis = 1)]
with warnings.catch_warnings():
warnings.simplefilter("ignore")
potEx = potential(approx.trainedModel.centerNormalize(polesEx)(0),
sampler.normalFoci()) / sampler.groundPotential()
potEx[potEx < 1.] = 1.
potEx[len(potEx) // 2] = np.nan
errRel += [2. / np.abs(ks[1] - ks[0]) * errAbs[-1] / potEx]
symbols = '.x+dso^v<>'
fig = plt.figure(figsize = plt.figaspect(.5))
ax1 = fig.add_subplot(1, 2, 1)
ax2 = fig.add_subplot(1, 2, 2)
for j in range(len(methods)):
ax1.semilogy(np.real(polesEx), errAbs[j], '{}-'.format(symbols[j]))
ax2.semilogy(np.real(polesEx), errRel[j], '{}-'.format(symbols[j]))
ax1.set_title("Absolute")
ax1.grid()
ax2.set_title("Relative")
ax2.legend(["S = {}".format(m) for m in methods])
ax2.grid()
plt.tight_layout()
plt.show()
print("\n")
diff --git a/examples/7_MHD/mhd.py b/examples/7_MHD/mhd.py
index 8e58b3d..02b90fb 100644
--- a/examples/7_MHD/mhd.py
+++ b/examples/7_MHD/mhd.py
@@ -1,73 +1,73 @@
import numpy as np
import matplotlib.pyplot as plt
from mhd_engine import MHDEngine as engine
from rrompy.reduction_methods import (RationalInterpolant as RI,
RationalInterpolantGreedy as RIG)
from rrompy.parameter.parameter_sampling import (FFTSampler as FFTS,
QuadratureCircleSampler as QCS,
QuadratureBoxSampler as QBS)
ks = [-.35 + .5j, 0. + .5j]
k0 = np.mean(ks)
solver = engine(5)
kEffDelta = .1 * (ks[1] - ks[0])
kEff = np.real([ks[0] - kEffDelta, ks[1] + kEffDelta])
iEff = kEff - .5 * np.sum(np.real(ks)) + np.imag(ks[0])
nPoles = 50
polesEx = solver.getPolesExact(nPoles, k0)
for corrector in [False, True]:
for method in ["FFT", "BOX", "GREEDY"]:
print("Testing {} method with{} corrector".format(method,
"out" * (not corrector)))
if method == "FFT":
params = {'S':64, 'POD':True, 'polybasis':"MONOMIAL",
'sampler':FFTS(ks), 'correctorTol':1e-5}
algo = RI
if method == "BOX":
params = {'S':64, 'POD':True, 'polybasis':"MONOMIAL",
'sampler':QBS(ks), 'correctorTol':1e-5}
algo = RI
if method == "GREEDY":
params = {'S':30, 'POD':True, 'greedyTol':1e-2,
'polybasis':"MONOMIAL", 'sampler':QCS(ks),
'errorEstimatorKind':"LOOK_AHEAD", 'nTestPoints':10000,
'trainSetGenerator':FFTS(ks), 'correctorTol':1e-5}
algo = RIG
params['correctorForce'] = corrector
approx = algo(solver, mu0 = k0, approx_state = True,
approxParameters = params, verbosity = 10)
approx.setupApprox()
poles, residues = approx.getResidues()
inRange = np.logical_and(
np.logical_and(np.real(poles) >= kEff[0], np.real(poles) <= kEff[1]),
np.logical_and(np.imag(poles) >= iEff[0], np.imag(poles) <= iEff[1]))
polesEff = poles[inRange]
- resNormEff = np.linalg.norm(residues, axis = 0)[inRange]
+ resNormEff = np.linalg.norm(residues, axis = 1)[inRange]
rLm = np.min(np.log(resNormEff))
rLmM = np.max(np.log(resNormEff)) - rLm
fig = plt.figure(figsize = (10, 10))
ax = fig.add_subplot(1, 1, 1)
if method == "GREEDY":
ax.plot(approx.muTest.re.data.flatten(),
approx.muTest.im.data.flatten(), 'k,', alpha = 0.25)
for pl, rN in zip(polesEff, resNormEff):
if corrector:
alpha = .35 + .4 * (np.log(rN) - rLm) / rLmM
else:
alpha = .1 + .65 * (np.log(rN) - rLm) / rLmM
ax.annotate("{0:.0e}".format(rN), (np.real(pl), np.imag(pl)),
alpha = alpha)
ax.plot(np.real(pl), np.imag(pl), 'r+', alpha = alpha + .25)
ax.plot(approx.mus.re.data.flatten(),
approx.mus.im.data.flatten(), 'k.')
ax.plot(np.real(polesEx), np.imag(polesEx), 'bx')
ax.set_xlim(kEff)
ax.set_ylim(iEff)
ax.grid()
plt.tight_layout()
plt.show()
print("\n")
diff --git a/rrompy/reduction_methods/pivoted/trained_model/trained_model_pivoted_rational_nomatch.py b/rrompy/reduction_methods/pivoted/trained_model/trained_model_pivoted_rational_nomatch.py
index 088c5b8..4f3913b 100644
--- a/rrompy/reduction_methods/pivoted/trained_model/trained_model_pivoted_rational_nomatch.py
+++ b/rrompy/reduction_methods/pivoted/trained_model/trained_model_pivoted_rational_nomatch.py
@@ -1,353 +1,352 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
import numpy as np
from copy import deepcopy as copy
from scipy.special import factorial as fact
from itertools import combinations
from rrompy.reduction_methods.standard.trained_model.trained_model_rational \
import TrainedModelRational
-from rrompy.utilities.base.types import (Np1D, List, ListAny, paramVal,
+from rrompy.utilities.base.types import (Np1D, Np2D, List, ListAny, paramVal,
paramList, sampList)
from rrompy.utilities.base import verbosityManager as vbMng, freepar as fp
from rrompy.utilities.numerical import dot
from rrompy.utilities.numerical.compress_matrix import compressMatrix
from rrompy.utilities.numerical.point_matching import potential
from rrompy.utilities.poly_fitting.heaviside import (rational2heaviside,
HeavisideInterpolator as HI)
from rrompy.utilities.poly_fitting.nearest_neighbor import (
NearestNeighborInterpolator as NNI)
from rrompy.utilities.exception_manager import (RROMPyException, RROMPyAssert,
RROMPyWarning)
from rrompy.parameter import checkParameterList
from rrompy.sampling import emptySampleList
__all__ = ['TrainedModelPivotedRationalNoMatch']
class TrainedModelPivotedRationalNoMatch(TrainedModelRational):
"""
ROM approximant evaluation for pivoted approximants based on interpolation
of rational approximants (without pole matching).
Attributes:
Data: dictionary with all that can be pickled.
"""
def compress(self, collapse : bool = False, tol : float = 0., *args,
**kwargs):
if not collapse and tol <= 0.: return
RMat = self.data.projMat
if not collapse:
if hasattr(self.data, "_compressTol"):
RROMPyWarning(("Recompressing already compressed model is "
"ineffective. Aborting."))
return
self.data.projMat, RMat, _ = compressMatrix(RMat, tol, *args,
**kwargs)
for obj, suppj in zip(self.data.HIs, self.data.Psupp):
if suppj is None:
obj.postmultiplyTensorize(RMat.T)
else:
obj.postmultiplyTensorize(RMat.T[suppj[0] : suppj[1]])
if hasattr(self, "_HIsExcl"):
for obj, suppj in zip(self._HIsExcl, self.data.Psupp):
if suppj is None:
obj.postmultiplyTensorize(RMat.T)
else:
obj.postmultiplyTensorize(RMat.T[suppj[0] : suppj[1]])
if hasattr(self.data, "Ps"):
for obj, suppj in zip(self.data.Ps, self.data.Psupp):
if suppj is None:
obj.postmultiplyTensorize(RMat.T)
else:
obj.postmultiplyTensorize(RMat.T[suppj[0] : suppj[1]])
if hasattr(self, "_PsExcl"):
for obj, suppj in zip(self.data._PsExcl, self.data._PsuppExcl):
if suppj is None:
obj.postmultiplyTensorize(RMat.T)
else:
obj.postmultiplyTensorize(RMat.T[suppj[0] : suppj[1]])
if hasattr(self.data, "coeffsEff"):
for j in range(len(self.data.coeffsEff)):
self.data.coeffsEff[j] = dot(self.data.coeffsEff[j], RMat.T)
if hasattr(self, "_HIsExcl") or hasattr(self, "_PsExcl"):
self.data._PsuppExcl = [None] * len(self.data._PsuppExcl)
self.data.Psupp = [None] * len(self.data.Psupp)
super(TrainedModelRational, self).compress(collapse, tol)
def centerNormalizePivot(self, mu : paramList = [],
mu0 : paramVal = None) -> paramList:
"""
Compute normalized parameter to be plugged into approximant.
Args:
mu: Parameter(s) 1.
mu0: Parameter(s) 2. If None, set to self.data.mu0Pivot.
Returns:
Normalized parameter.
"""
mu = checkParameterList(mu, self.data.nparPivot)[0]
if mu0 is None: mu0 = self.data.mu0Pivot
rad = ((mu ** self.data.rescalingExpPivot
- mu0 ** self.data.rescalingExpPivot)
/ self.data.scaleFactorPivot)
return rad
def setupMarginalInterp(self, rDWM : Np1D = None):
self.data.NN = NNI()
self.data.NN.setupByInterpolation(self.data.musMarginal,
np.arange(len(self.data.musMarginal)),
1, rDWM)
def updateEffectiveSamples(self, exclude:List[int], *args, **kwargs):
if hasattr(self, "_idxExcl"):
for j, excl in enumerate(self._idxExcl):
self.data.musMarginal.insert(self._musMExcl[j], excl)
self.data.HIs.insert(excl, self._HIsExcl[j])
self.data.Ps.insert(excl, self._PsExcl[j])
self.data.Qs.insert(excl, self._QsExcl[j])
self.data.Psupp.insert(excl, self._PsuppExcl[j])
self._idxExcl, self._musMExcl = list(np.sort(exclude)), []
self._HIsExcl, self._PsExcl, self._QsExcl = [], [], []
self._PsuppExcl = []
for excl in self._idxExcl[::-1]:
self._musMExcl = [self.data.musMarginal[excl]] + self._musMExcl
self.data.musMarginal.pop(excl)
self._HIsExcl = [self.data.HIs.pop(excl)] + self._HIsExcl
self._PsExcl = [self.data.Ps.pop(excl)] + self._PsExcl
self._QsExcl = [self.data.Qs.pop(excl)] + self._QsExcl
self._PsuppExcl = [self.data.Psupp.pop(excl)] + self._PsuppExcl
poles = [hi.poles for hi in self.data.HIs]
coeffs = [hi.coeffs for hi in self.data.HIs]
self.initializeFromLists(poles, coeffs, self.data.Psupp,
self.data.HIs[0].polybasis, *args, **kwargs)
def initializeFromLists(self, poles:ListAny, coeffs:ListAny, supps:ListAny,
basis:str):
"""Initialize Heaviside representation."""
self.data.HIs = []
for pls, cfs in zip(poles, coeffs):
hsi = HI()
hsi.poles = pls
hsi.coeffs = cfs
hsi.npar = 1
hsi.polybasis = basis
self.data.HIs += [hsi]
def initializeFromRational(self):
"""Initialize Heaviside representation."""
RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters")
poles, coeffs = [], []
for Q, P in zip(self.data.Qs, self.data.Ps):
cfs, pls, basis = rational2heaviside(P, Q)
poles += [pls]
coeffs += [cfs]
self.initializeFromLists(poles, coeffs, self.data.Psupp, basis)
def recompressByCutOff(self, tol:float, foci:List[np.complex],
ground:float) -> str:
mu0 = np.mean(foci)
gLocPoles = [potential(hi.poles, foci) - ground <= tol * ground
for hi in self.data.HIs]
nRemPole = np.sum([np.sum(np.logical_not(gLPi)) for gLPi in gLocPoles])
if nRemPole == 0: return " No poles erased."
for hi, gLocPolesi in zip(self.data.HIs, gLocPoles):
N = len(hi.poles)
polyCorrection = np.zeros_like(hi.coeffs[0, :])
for j, goodj in enumerate(gLocPolesi):
if not goodj and not np.isinf(hi.poles[j]):
polyCorrection += hi.coeffs[j, :] / (mu0 - hi.poles[j])
if len(hi.coeffs) == N:
hi.coeffs = np.vstack((hi.coeffs, polyCorrection))
else:
hi.coeffs[N, :] += polyCorrection
hi.poles = hi.poles[gLocPolesi]
gLocCoeffi = np.append(gLocPolesi,
np.ones(len(hi.coeffs) - N, dtype = bool))
hi.coeffs = hi.coeffs[gLocCoeffi, :]
return " Erased {} pole{}.".format(nRemPole, "s" * (nRemPole != 1))
def interpolateMarginalInterpolator(self, mu : paramList = []) -> ListAny:
"""Obtain interpolated approximant interpolator."""
mu = checkParameterList(mu, self.data.nparMarginal)[0]
vbMng(self, "INIT", "Finding nearest neighbor to mu = {}.".format(mu),
95)
intM = np.array(self.data.NN(mu), dtype = int)
vbMng(self, "DEL", "Done finding nearest neighbor.", 95)
his = [None] * len(mu)
for j in range(len(mu)):
his[j] = HI()
his[j].poles = copy(self.data.HIs[intM[j]].poles)
his[j].coeffs = copy(self.data.HIs[intM[j]].coeffs)
his[j].npar = 1
his[j].polybasis = self.data.HIs[0].polybasis
if self.data.Psupp[i] is not None:
his[j].pad(self.data.Psupp[i][0],
self.data.projMat.shape[1] - self.data.Psupp[i][1])
return his
def interpolateMarginalPoles(self, mu : paramList = []) -> ListAny:
"""Obtain interpolated approximant poles."""
interps = self.interpolateMarginalInterpolator(mu)
return [interp.poles for interp in interps]
def interpolateMarginalCoeffs(self, mu : paramList = []) -> ListAny:
"""Obtain interpolated approximant poles."""
interps = self.interpolateMarginalInterpolator(mu)
return [interp.coeffs for interp in interps]
def getApproxReduced(self, mu : paramList = []) -> sampList:
"""
Evaluate reduced representation of approximant at arbitrary parameter.
Args:
mu: Target parameter.
"""
RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters")
mu = checkParameterList(mu, self.data.npar)[0]
if (not hasattr(self, "lastSolvedApproxReduced")
or self.lastSolvedApproxReduced != mu):
vbMng(self, "INIT",
"Evaluating approximant at mu = {}.".format(mu), 12)
self.uApproxReduced = emptySampleList()
muP = self.centerNormalizePivot(mu.data[:,
self.data.directionPivot])
muM = mu.data[:, self.data.directionMarginal]
his = self.interpolateMarginalInterpolator(muM)
for i, (mP, hi) in enumerate(zip(muP, his)):
uAppR = hi(mP)
if i == 0:
self.uApproxReduced.reset((len(uAppR), len(mu)),
dtype = uAppR.dtype)
self.uApproxReduced[i] = uAppR
vbMng(self, "DEL", "Done evaluating approximant.", 12)
self.lastSolvedApproxReduced = mu
return self.uApproxReduced
def getPVal(self, mu : paramList = []) -> sampList:
"""
Evaluate rational numerator at arbitrary parameter.
Args:
mu: Target parameter.
"""
RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters")
mu = checkParameterList(mu, self.data.npar)[0]
p = emptySampleList()
muP = self.centerNormalizePivot(mu.data[:, self.data.directionPivot])
muM = mu.data[:, self.data.directionMarginal]
his = self.interpolateMarginalInterpolator(muM)
for i, (mP, hi) in enumerate(zip(muP, his)):
Pval = hi(mP) * np.prod(mP[0] - hi.poles)
if i == 0: p.reset((len(Pval), len(mu)), dtype = Pval.dtype)
p[i] = Pval
return p
def getQVal(self, mu:Np1D, der : List[int] = None,
scl : Np1D = None) -> Np1D:
"""
Evaluate rational denominator at arbitrary parameter.
Args:
mu: Target parameter.
der(optional): Derivatives to take before evaluation.
"""
RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters")
mu = checkParameterList(mu, self.data.npar)[0]
muP = self.centerNormalizePivot(mu.data[:, self.data.directionPivot])
muM = mu.data[:, self.data.directionMarginal]
if der is None:
derP, derM = 0, [0]
else:
derP = der[self.data.directionPivot[0]]
derM = [der[x] for x in self.data.directionMarginal]
if np.any(np.array(derM) != 0):
raise RROMPyException(("Derivatives of Q with respect to marginal "
"parameters not allowed."))
sclP = 1 if scl is None else scl[self.data.directionPivot[0]]
derVal = np.zeros(len(mu), dtype = np.complex)
pls = self.interpolateMarginalPoles(muM)
for i, (mP, pl) in enumerate(zip(muP, pls)):
N = len(pl)
if derP == N: derVal[i] = 1.
elif derP >= 0 and derP < N:
plDist = muP[0] - pl
for terms in combinations(np.arange(N), N - derP):
derVal[i] += np.prod(plDist[list(terms)], axis = 1)
return sclP ** derP * fact(derP) * derVal
def getPoles(self, *args, **kwargs) -> Np1D:
"""
Obtain approximant poles.
Returns:
Numpy complex vector of poles.
"""
RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters")
if len(args) + len(kwargs) > 1:
raise RROMPyException(("Wrong number of parameters passed. "
"Only 1 available."))
elif len(args) + len(kwargs) == 1:
if len(args) == 1:
mVals = args[0]
else:
mVals = kwargs["marginalVals"]
if not hasattr(mVals, "__len__"): mVals = [mVals]
mVals = list(mVals)
else:
mVals = [fp]
try:
rDim = mVals.index(fp)
if rDim < len(mVals) - 1 and fp in mVals[rDim + 1 :]:
raise
except:
raise RROMPyException(("Exactly 1 'freepar' entry in "
"marginalVals must be provided."))
if rDim != self.data.directionPivot[0]:
raise RROMPyException(("'freepar' entry in marginalVals must "
"coincide with pivot direction."))
mVals[rDim] = self.data.mu0(rDim)
mMarg = [mVals[j] for j in range(len(mVals)) if j != rDim]
roots = self.interpolateMarginalPoles(mMarg)[0]
return np.power(self.data.mu0(rDim) ** self.data.rescalingExp[rDim]
+ self.data.scaleFactor[rDim] * roots,
1. / self.data.rescalingExp[rDim])
- def getResidues(self, *args, **kwargs) -> Np1D:
+ def getResidues(self, *args, **kwargs) -> Np2D:
"""
Obtain approximant residues.
Returns:
Numpy matrix with residues as columns.
"""
pls = self.getPoles(*args, **kwargs)
if len(args) == 1:
mVals = args[0]
elif len(args) == 0:
mVals = [None]
else:
mVals = kwargs["marginalVals"]
if not hasattr(mVals, "__len__"): mVals = [mVals]
mVals = list(mVals)
rDim = mVals.index(fp)
mMarg = [mVals[j] for j in range(len(mVals)) if j != rDim]
- res = self.interpolateMarginalCoeffs(mMarg)[0][: len(pls), :].T
- if not self.data._collapsed:
- res = self.data.projMat.dot(res)
+ res = self.interpolateMarginalCoeffs(mMarg)[0][: len(pls), :]
+ if not self.data._collapsed: res = self.data.projMat.dot(res.T).T
return pls, res
diff --git a/rrompy/reduction_methods/standard/rational_interpolant.py b/rrompy/reduction_methods/standard/rational_interpolant.py
index af465eb..42793f4 100644
--- a/rrompy/reduction_methods/standard/rational_interpolant.py
+++ b/rrompy/reduction_methods/standard/rational_interpolant.py
@@ -1,680 +1,680 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
from copy import deepcopy as copy
import numpy as np
from rrompy.reduction_methods.base import checkRobustTolerance
from .generic_standard_approximant import GenericStandardApproximant
from rrompy.utilities.poly_fitting.polynomial import (
polybases as ppb, polyfitname,
polyvander as pvP, polyvanderTotal as pvTP,
polyTimes, polyTimesTable, vanderInvTable,
PolynomialInterpolator as PI)
from rrompy.utilities.poly_fitting.heaviside import rational2heaviside
from rrompy.utilities.poly_fitting.radial_basis import (polybases as rbpb,
RadialBasisInterpolator as RBI)
from rrompy.utilities.base.types import Np1D, Np2D, Tuple, List, sampList
from rrompy.utilities.base import verbosityManager as vbMng
from rrompy.utilities.numerical import customPInv, dot, potential
from rrompy.utilities.numerical.hash_derivative import nextDerivativeIndices
from rrompy.utilities.numerical.degree import (reduceDegreeN,
degreeTotalToFull, fullDegreeMaxMask,
totalDegreeMaxMask)
from rrompy.utilities.exception_manager import (RROMPyException, RROMPyAssert,
RROMPyWarning)
__all__ = ['RationalInterpolant']
class RationalInterpolant(GenericStandardApproximant):
"""
ROM rational interpolant computation for parametric problems.
Args:
HFEngine: HF problem solver.
mu0(optional): Default parameter. Defaults to 0.
approxParameters(optional): Dictionary containing values for main
parameters of approximant. Recognized keys are:
- 'POD': whether to compute POD of snapshots; defaults to True;
- 'scaleFactorDer': scaling factors for derivative computation;
defaults to 'AUTO';
- 'S': total number of samples current approximant relies upon;
- 'sampler': sample point generator;
- 'polybasis': type of polynomial basis for interpolation; defaults
to 'MONOMIAL';
- 'M': degree of rational interpolant numerator; defaults to
'AUTO', i.e. maximum allowed;
- 'N': degree of rational interpolant denominator; defaults to
'AUTO', i.e. maximum allowed;
- 'polydegreetype': type of polynomial degree; defaults to 'TOTAL';
- 'radialDirectionalWeights': radial basis weights for interpolant
numerator; defaults to 1;
- 'interpRcond': tolerance for interpolation; defaults to None;
- 'robustTol': tolerance for robust rational denominator
management; defaults to 0;
- 'correctorForce': whether corrector should forcefully delete bad
poles; defaults to False;
- 'correctorTol': tolerance for corrector step; defaults to 0.,
i.e. no bad poles;
- 'correctorMaxIter': maximum number of corrector iterations;
defaults to 1.
Defaults to empty dict.
approx_state(optional): Whether to approximate state. Defaults to
False.
verbosity(optional): Verbosity level. Defaults to 10.
Attributes:
HFEngine: HF problem solver.
mu0: Default parameter.
mus: Array of snapshot parameters.
approxParameters: Dictionary containing values for main parameters of
approximant. Recognized keys are in parameterList.
parameterListSoft: Recognized keys of soft approximant parameters:
- 'POD': whether to compute POD of snapshots;
- 'scaleFactorDer': scaling factors for derivative computation;
- 'polybasis': type of polynomial basis for interpolation;
- 'M': degree of rational interpolant numerator;
- 'N': degree of rational interpolant denominator;
- 'polydegreetype': type of polynomial degree;
- 'radialDirectionalWeights': radial basis weights for interpolant
numerator;
- 'interpRcond': tolerance for interpolation via numpy.polyfit;
- 'robustTol': tolerance for robust rational denominator
management;
- 'correctorForce': whether corrector should forcefully delete bad
poles;
- 'correctorTol': tolerance for corrector step;
- 'correctorMaxIter': maximum number of corrector iterations.
parameterListCritical: Recognized keys of critical approximant
parameters:
- 'S': total number of samples current approximant relies upon;
- 'sampler': sample point generator.
approx_state: Whether to approximate state.
verbosity: Verbosity level.
POD: Whether to compute POD of snapshots.
scaleFactorDer: Scaling factors for derivative computation.
S: Number of solution snapshots over which current approximant is
based upon.
sampler: Sample point generator.
polybasis: type of polynomial basis for interpolation.
M: Numerator degree of approximant.
N: Denominator degree of approximant.
polydegreetype: Type of polynomial degree.
radialDirectionalWeights: Radial basis weights for interpolant
numerator.
interpRcond: Tolerance for interpolation via numpy.polyfit.
robustTol: Tolerance for robust rational denominator management.
correctorForce: Whether corrector should forcefully delete bad poles.
correctorTol: Tolerance for corrector step.
correctorMaxIter: Maximum number of corrector iterations.
muBounds: list of bounds for parameter values.
samplingEngine: Sampling engine.
uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as
sampleList.
lastSolvedHF: Parameter(s) corresponding to last computed high fidelity
solution(s) as parameterList.
uApproxReduced: Reduced approximate solution(s) with parameter(s)
lastSolvedApprox as sampleList.
lastSolvedApproxReduced: Parameter(s) corresponding to last computed
reduced approximate solution(s) as parameterList.
uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as
sampleList.
lastSolvedApprox: Parameter(s) corresponding to last computed
approximate solution(s) as parameterList.
Q: Numpy 1D vector containing complex coefficients of approximant
denominator.
P: Numpy 2D vector whose columns are FE dofs of coefficients of
approximant numerator.
"""
def __init__(self, *args, **kwargs):
self._preInit()
self._addParametersToList(["polybasis", "M", "N", "polydegreetype",
"radialDirectionalWeights", "interpRcond",
"robustTol", "correctorForce",
"correctorTol", "correctorMaxIter"],
["MONOMIAL", "AUTO", "AUTO", "TOTAL", [1.],
-1, 0., False, 0., 1])
super().__init__(*args, **kwargs)
self.catchInstability = 0
self._postInit()
@property
def tModelType(self):
from .trained_model.trained_model_rational import TrainedModelRational
return TrainedModelRational
@property
def polybasis(self):
"""Value of polybasis."""
return self._polybasis
@polybasis.setter
def polybasis(self, polybasis):
try:
polybasis = polybasis.upper().strip().replace(" ","")
if polybasis not in ppb + rbpb:
raise RROMPyException("Prescribed polybasis not recognized.")
self._polybasis = polybasis
except:
RROMPyWarning(("Prescribed polybasis not recognized. Overriding "
"to 'MONOMIAL'."))
self._polybasis = "MONOMIAL"
self._approxParameters["polybasis"] = self.polybasis
@property
def polybasis0(self):
if "_" in self.polybasis:
return self.polybasis.split("_")[0]
return self.polybasis
@property
def interpRcond(self):
"""Value of interpRcond."""
return self._interpRcond
@interpRcond.setter
def interpRcond(self, interpRcond):
self._interpRcond = interpRcond
self._approxParameters["interpRcond"] = self.interpRcond
@property
def radialDirectionalWeights(self):
"""Value of radialDirectionalWeights."""
return self._radialDirectionalWeights
@radialDirectionalWeights.setter
def radialDirectionalWeights(self, radialDirectionalWeights):
if not hasattr(radialDirectionalWeights, "__len__"):
radialDirectionalWeights = [radialDirectionalWeights]
self._radialDirectionalWeights = radialDirectionalWeights
self._approxParameters["radialDirectionalWeights"] = (
self.radialDirectionalWeights)
@property
def M(self):
"""Value of M."""
return self._M
@M.setter
def M(self, M):
if isinstance(M, str):
M = M.strip().replace(" ","")
if "-" not in M: M = M + "-0"
self._M_isauto, self._M_shift = True, int(M.split("-")[-1])
M = 0
if M < 0: raise RROMPyException("M must be non-negative.")
self._M = M
self._approxParameters["M"] = self.M
def _setMAuto(self):
self.M = max(0, reduceDegreeN(self.S, self.S, self.npar,
self.polydegreetype) - self._M_shift)
vbMng(self, "MAIN", "Automatically setting M to {}.".format(self.M),
25)
@property
def N(self):
"""Value of N."""
return self._N
@N.setter
def N(self, N):
if isinstance(N, str):
N = N.strip().replace(" ","")
if "-" not in N: N = N + "-0"
self._N_isauto, self._N_shift = True, int(N.split("-")[-1])
N = 0
if N < 0: raise RROMPyException("N must be non-negative.")
self._N = N
self._approxParameters["N"] = self.N
def _setNAuto(self):
self.N = max(0, reduceDegreeN(self.S, self.S, self.npar,
self.polydegreetype) - self._N_shift)
vbMng(self, "MAIN", "Automatically setting N to {}.".format(self.N),
25)
@property
def polydegreetype(self):
"""Value of polydegreetype."""
return self._polydegreetype
@polydegreetype.setter
def polydegreetype(self, polydegreetype):
try:
polydegreetype = polydegreetype.upper().strip().replace(" ","")
if polydegreetype not in ["TOTAL", "FULL"]:
raise RROMPyException(("Prescribed polydegreetype not "
"recognized."))
self._polydegreetype = polydegreetype
except:
RROMPyWarning(("Prescribed polydegreetype not recognized. "
"Overriding to 'TOTAL'."))
self._polydegreetype = "TOTAL"
self._approxParameters["polydegreetype"] = self.polydegreetype
@property
def robustTol(self):
"""Value of tolerance for robust rational denominator management."""
return self._robustTol
@robustTol.setter
def robustTol(self, robustTol):
if robustTol < 0.:
RROMPyWarning(("Overriding prescribed negative robustness "
"tolerance to 0."))
robustTol = 0.
self._robustTol = robustTol
self._approxParameters["robustTol"] = self.robustTol
@property
def correctorForce(self):
"""Value of correctorForce."""
return self._correctorForce
@correctorForce.setter
def correctorForce(self, correctorForce):
self._correctorForce = correctorForce
self._approxParameters["correctorForce"] = self.correctorForce
@property
def correctorTol(self):
"""Value of correctorTol."""
return self._correctorTol
@correctorTol.setter
def correctorTol(self, correctorTol):
if correctorTol < 0. or (correctorTol > 0. and self.npar > 1):
RROMPyWarning(("Overriding prescribed corrector tolerance "
"to 0."))
correctorTol = 0.
self._correctorTol = correctorTol
self._approxParameters["correctorTol"] = self.correctorTol
@property
def correctorMaxIter(self):
"""Value of correctorMaxIter."""
return self._correctorMaxIter
@correctorMaxIter.setter
def correctorMaxIter(self, correctorMaxIter):
if correctorMaxIter < 1 or (correctorMaxIter > 1 and self.npar > 1):
RROMPyWarning(("Overriding prescribed max number of corrector "
"iterations to 1."))
correctorMaxIter = 1
self._correctorMaxIter = correctorMaxIter
self._approxParameters["correctorMaxIter"] = self.correctorMaxIter
def resetSamples(self):
"""Reset samples."""
super().resetSamples()
self._musUniqueCN = None
self._derIdxs = None
self._reorder = None
def _setupInterpolationIndices(self):
"""Setup parameters for polyvander."""
if self._musUniqueCN is None or len(self._reorder) != len(self.mus):
self._musUniqueCN, musIdxsTo, musIdxs, musCount = (
self.trainedModel.centerNormalize(self.mus).unique(
return_index = True, return_inverse = True,
return_counts = True))
self._musUnique = self.mus[musIdxsTo]
self._derIdxs = [None] * len(self._musUniqueCN)
self._reorder = np.empty(len(musIdxs), dtype = int)
filled = 0
for j, cnt in enumerate(musCount):
self._derIdxs[j] = nextDerivativeIndices([], self.mus.shape[1],
cnt)
jIdx = np.nonzero(musIdxs == j)[0]
self._reorder[jIdx] = np.arange(filled, filled + cnt)
filled += cnt
def _setupDenominator(self):
"""Compute rational denominator."""
RROMPyAssert(self._mode, message = "Cannot setup denominator.")
vbMng(self, "INIT", "Starting computation of denominator.", 7)
if hasattr(self, "_N_isauto"):
self._setNAuto()
else:
N = reduceDegreeN(self.N, self.S, self.npar, self.polydegreetype)
if N < self.N:
RROMPyWarning(("N too large compared to S. Reducing N by "
"{}").format(self.N - N))
self.N = N
while self.N > 0:
invD, fitinv = self._computeInterpolantInverseBlocks()
idxSamplesEff = list(range(self.S))
if self.POD:
ev, eV = self.findeveVGQR(
self.samplingEngine.RPOD[:, idxSamplesEff], invD)
else:
ev, eV = self.findeveVGExplicit(
self.samplingEngine.samples(idxSamplesEff), invD)
nevBad = checkRobustTolerance(ev, self.robustTol)
if nevBad <= 1: break
if self.catchInstability > 0:
raise RROMPyException(("Instability in denominator "
"computation: eigenproblem is poorly "
"conditioned."),
self.catchInstability == 1)
vbMng(self, "MAIN", ("Smallest {} eigenvalues below tolerance. "
"Reducing N by 1.").format(nevBad), 10)
self.N = self.N - 1
if self.N <= 0:
self.N = 0
eV = np.ones((1, 1))
q = PI()
q.npar = self.npar
q.polybasis = self.polybasis0
if self.polydegreetype == "TOTAL":
q.coeffs = degreeTotalToFull(tuple([self.N + 1] * self.npar),
self.npar, eV[:, 0])
else:
q.coeffs = eV[:, 0].reshape([self.N + 1] * self.npar)
vbMng(self, "DEL", "Done computing denominator.", 7)
return q, fitinv
def _setupNumerator(self):
"""Compute rational numerator."""
RROMPyAssert(self._mode, message = "Cannot setup numerator.")
vbMng(self, "INIT", "Starting computation of numerator.", 7)
self._setupInterpolationIndices()
Qevaldiag = polyTimesTable(self.trainedModel.data.Q, self._musUniqueCN,
self._reorder, self._derIdxs,
self.scaleFactorRel)
if self.POD: Qevaldiag = Qevaldiag.dot(self.samplingEngine.RPOD.T)
if hasattr(self, "_M_isauto"):
self._setMAuto()
M = self.M
else:
M = reduceDegreeN(self.M, self.S, self.npar, self.polydegreetype)
if M < self.M:
RROMPyWarning(("M too large compared to S. Reducing M by "
"{}").format(self.M - M))
self.M = M
while self.M >= 0:
pParRest = [self.verbosity >= 5, self.polydegreetype == "TOTAL",
{"derIdxs": self._derIdxs, "reorder": self._reorder,
"scl": self.scaleFactorRel}]
if self.polybasis in ppb:
p = PI()
else:
self.computeScaleFactor()
rDWEff = [w * f for w, f in zip(self.radialDirectionalWeights,
self.scaleFactor)]
pParRest = [rDWEff] + pParRest
p = RBI()
if self.polybasis in ppb + rbpb:
pParRest += [{"rcond": self.interpRcond}]
wellCond, msg = p.setupByInterpolation(self._musUniqueCN,
Qevaldiag, self.M,
self.polybasis, *pParRest)
vbMng(self, "MAIN", msg, 5)
if wellCond: break
if self.catchInstability > 0:
raise RROMPyException(("Instability in numerator computation: "
"polyfit is poorly conditioned."),
self.catchInstability == 1)
vbMng(self, "MAIN", ("Polyfit is poorly conditioned. Reducing M "
"by 1."), 10)
self.M = self.M - 1
if self.M < 0:
raise RROMPyException(("Instability in computation of numerator. "
"Aborting."))
self.M = M
vbMng(self, "DEL", "Done computing numerator.", 7)
return p
def setupApprox(self) -> int:
"""Compute rational interpolant."""
if self.checkComputedApprox(): return -1
RROMPyAssert(self._mode, message = "Cannot setup approximant.")
vbMng(self, "INIT", "Setting up {}.". format(self.name()), 5)
self.computeSnapshots()
pMat = self.samplingEngine.samples.data
pMatEff = dot(self.HFEngine.C, pMat) if self.approx_state else pMat
if self.trainedModel is None:
self.trainedModel = self.tModelType()
self.trainedModel.verbosity = self.verbosity
self.trainedModel.timestamp = self.timestamp
datadict = {"mu0": self.mu0, "mus": copy(self.mus),
"projMat": pMatEff, "scaleFactor": self.scaleFactor,
"rescalingExp": self.HFEngine.rescalingExp}
self.trainedModel.data = self.initializeModelData(datadict)[0]
else:
self.trainedModel = self.trainedModel
self.trainedModel.data.projMat = copy(pMatEff)
self.trainedModel.data.mus = copy(self.mus)
self._iterCorrector()
self.trainedModel.data.approxParameters = copy(self.approxParameters)
vbMng(self, "DEL", "Done setting up approximant.", 5)
return 0
def _iterCorrector(self):
noCorrector = self.correctorTol <= 0. or (self.correctorMaxIter <= 1
and not self.correctorForce)
if noCorrector:
vbMng(self, "INIT", "Starting approximant finalization.", 5)
else:
vbMng(self, "INIT", "Starting correction iterations.", 5)
self._Qhat = PI()
self._Qhat.npar = self.npar
self._Qhat.polybasis = "MONOMIAL"
self._Qhat.coeffs = np.ones(1, dtype = np.complex)
if self.POD:
self._samplesOld = copy(self.samplingEngine.RPOD)
else:
self._samplesOld = copy(self.samplingEngine.samples)
Nauto = hasattr(self, "_N_isauto")
for j in range(self.correctorMaxIter):
if self.N > 0 or (Nauto and self.S > self.npar):
Q = self._setupDenominator()[0]
if hasattr(self, "_N_isauto"): del self._N_isauto
else:
Q = PI()
Q.coeffs = np.ones((1,) * self.npar, dtype = np.complex)
Q.npar = self.npar
Q.polybasis = self.polybasis
self.N = 0
if j == 0: _N0 = self.N
self.trainedModel.data.Q = Q
self.trainedModel.data.P = self._setupNumerator()
if noCorrector or (j >= self.correctorMaxIter - 1
and not self.correctorForce):
self.N = 0
else:
self._applyCorrector(j)
if self.N <= 0: break
if Nauto: self._N_isauto = True
self.N = _N0
if noCorrector:
vbMng(self, "DEL", "Terminated approximant finalization.", 5)
return
if self.POD:
self.samplingEngine.RPOD = self._samplesOld
else:
self.samplingEngine.samples = self._samplesOld
del self._samplesOld
if self.correctorForce:
QOld, QOldBasis = [1.], "MONOMIAL"
else:
QOld, QOldBasis = Q.coeffs, self.polybasis
Q = polyTimes(self._Qhat.coeffs, QOld, Pbasis = self._Qhat.polybasis,
Qbasis = QOldBasis, Rbasis = self.polybasis)
del self._Qhat
gamma = np.linalg.norm(Q)
self.trainedModel.data.Q.coeffs = np.pad(Q, (0, self.N - len(Q) + 1),
"constant") / gamma
if self.correctorForce:
self.trainedModel.data.P = self._setupNumerator()
else:
self.trainedModel.data.P.coeffs /= gamma
vbMng(self, "DEL", "Terminated correction iterations.", 5)
def _applyCorrector(self, j:int):
cfs, pls, _ = rational2heaviside(self.trainedModel.data.P,
self.trainedModel.data.Q)
cfs = cfs[: self.N]
if self.POD:
resEff = np.linalg.norm(cfs, axis = 1)
else:
resEff = self.HFEngine.norm(self.samplingEngine.samples.dot(cfs.T),
is_state = self.approx_state)
potentialEff = (potential(pls, self.sampler.normalFoci())
/ self.sampler.groundPotential())
potentialEff[np.logical_or(potentialEff < 1., np.isinf(pls))] = 1.
resEff[np.isinf(pls)] = 0.
resEff /= potentialEff
idxKeep = np.where(resEff >= self.correctorTol * np.max(resEff))[0]
vbMng(self, "MAIN",
("Correction iteration no. {}: {} out of {} residuals satisfy "
"tolerance.").format(j + 1, len(idxKeep), self.N), 10)
self.N -= len(idxKeep)
if ((self.N <= 0 or j < self.correctorMaxIter - 1)
and not self.correctorForce): return
for i in idxKeep:
self._Qhat.coeffs = polyTimes(self._Qhat.coeffs, [- pls[i], 1.],
Pbasis = self._Qhat.polybasis,
Rbasis = self._Qhat.polybasis)
self._Qhat.coeffs /= np.linalg.norm(self._Qhat.coeffs)
if self.N <= 0: return
vbMng(self, "MAIN",
("Removing poles at (normalized positions): "
"{}.").format(pls[resEff < self.correctorTol * np.max(resEff)]),
65)
if j < self.correctorMaxIter - 1:
That = polyTimesTable(self._Qhat, self._musUniqueCN,
self._reorder, self._derIdxs,
self.scaleFactorRel).T
if self.POD:
self.samplingEngine.RPOD = self._samplesOld.dot(That)
else:
self.samplingEngine.samples = self._samplesOld.dot(That)
def _computeInterpolantInverseBlocks(self) -> Tuple[List[Np2D], Np2D]:
"""
Compute inverse factors for minimal interpolant target functional.
"""
RROMPyAssert(self._mode, message = "Cannot solve eigenvalue problem.")
self._setupInterpolationIndices()
TEGen = pvTP if self.polydegreetype == "TOTAL" else pvP
TEGenPar = [self.polybasis0, self._derIdxs, self._reorder,
self.scaleFactorRel]
if hasattr(self, "_M_isauto"): self._setMAuto()
E = max(self.M, self.N)
while E >= 0:
if self.polydegreetype == "TOTAL":
Eeff = E
idxsB = totalDegreeMaxMask(E, self.npar)
else: #if self.polydegreetype == "FULL":
Eeff = [E] * self.npar
idxsB = fullDegreeMaxMask(E, self.npar)
TE = TEGen(self._musUniqueCN, Eeff, *TEGenPar)
fitOut = customPInv(TE, rcond = self.interpRcond, full = True)
vbMng(self, "MAIN",
("Fitting {} samples with degree {} through {}... "
"Conditioning of pseudoinverse system: {:.4e}.").format(
TE.shape[0], E,
polyfitname(self.polybasis0),
fitOut[1][1][0] / fitOut[1][1][-1]),
5)
if fitOut[1][0] == TE.shape[1]:
fitinv = fitOut[0][idxsB, :]
break
if self.catchInstability > 0:
raise RROMPyException(("Instability in denominator "
"computation: polyfit is poorly "
"conditioned."),
self.catchInstability == 1)
EeqN = E == self.N
vbMng(self, "MAIN", ("Polyfit is poorly conditioned. Reducing E {}"
"by 1.").format("and N " * EeqN), 10)
if EeqN: self.N = self.N - 1
E -= 1
if self.N < 0:
raise RROMPyException(("Instability in computation of "
"denominator. Aborting."))
invD = vanderInvTable(fitinv, idxsB, self._reorder, self._derIdxs)
if self.N == E:
TN = TE
else:
if self.polydegreetype == "TOTAL":
Neff = self.N
idxsB = totalDegreeMaxMask(self.N, self.npar)
else: #if self.polydegreetype == "FULL":
Neff = [self.N] * self.npar
idxsB = fullDegreeMaxMask(self.N, self.npar)
TN = TEGen(self._musUniqueCN, Neff, *TEGenPar)
for k in range(len(invD)): invD[k] = dot(invD[k], TN)
return invD, fitinv
def findeveVGExplicit(self, sampleE:sampList,
invD:List[Np2D]) -> Tuple[Np1D, Np2D]:
"""
Compute explicitly eigenvalues and eigenvectors of rational denominator
matrix.
"""
RROMPyAssert(self._mode, message = "Cannot solve eigenvalue problem.")
nEnd = invD[0].shape[1]
eWidth = len(invD)
vbMng(self, "INIT", "Building gramian matrix.", 10)
gramian = self.HFEngine.innerProduct(sampleE, sampleE,
is_state = self.approx_state)
G = np.zeros((nEnd, nEnd), dtype = np.complex)
for k in range(eWidth):
G += dot(dot(gramian, invD[k]).T, invD[k].conj()).T
vbMng(self, "DEL", "Done building gramian.", 10)
vbMng(self, "INIT", "Solving eigenvalue problem for gramian matrix.",
7)
try:
ev, eV = np.linalg.eigh(G)
except np.linalg.LinAlgError as e:
raise RROMPyException(e)
vbMng(self, "MAIN",
("Solved eigenvalue problem of size {} with condition number "
"{:.4e}.").format(nEnd, ev[-1] / ev[0]), 5)
vbMng(self, "DEL", "Done solving eigenvalue problem.", 7)
return ev, eV
def findeveVGQR(self, RPODE:Np2D, invD:List[Np2D]) -> Tuple[Np1D, Np2D]:
"""
Compute eigenvalues and eigenvectors of rational denominator matrix
through SVD of R factor.
"""
RROMPyAssert(self._mode, message = "Cannot solve eigenvalue problem.")
nEnd = invD[0].shape[1]
S = RPODE.shape[0]
eWidth = len(invD)
vbMng(self, "INIT", "Building half-gramian matrix stack.", 10)
Rstack = np.zeros((S * eWidth, nEnd), dtype = np.complex)
for k in range(eWidth):
Rstack[k * S : (k + 1) * S, :] = dot(RPODE, invD[k])
vbMng(self, "DEL", "Done building half-gramian.", 10)
vbMng(self, "INIT", "Solving svd for square root of gramian matrix.",
7)
try:
_, s, eV = np.linalg.svd(Rstack, full_matrices = False)
except np.linalg.LinAlgError as e:
raise RROMPyException(e)
ev = s[::-1]
eV = eV[::-1, :].T.conj()
vbMng(self, "MAIN",
("Solved svd problem of size {} x {} with condition number "
"{:.4e}.").format(*Rstack.shape, s[0] / s[-1]), 5)
vbMng(self, "DEL", "Done solving svd.", 7)
return ev, eV
- def getResidues(self, *args, **kwargs) -> Np1D:
+ def getResidues(self, *args, **kwargs) -> Np2D:
"""
Obtain approximant residues.
Returns:
Matrix with residues as columns.
"""
return self.trainedModel.getResidues(*args, **kwargs)
diff --git a/rrompy/reduction_methods/standard/trained_model/trained_model_rational.py b/rrompy/reduction_methods/standard/trained_model/trained_model_rational.py
index 1f9e7b3..959b664 100644
--- a/rrompy/reduction_methods/standard/trained_model/trained_model_rational.py
+++ b/rrompy/reduction_methods/standard/trained_model/trained_model_rational.py
@@ -1,194 +1,194 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
import numpy as np
from rrompy.reduction_methods.base.trained_model.trained_model import (
TrainedModel)
from rrompy.utilities.numerical.compress_matrix import compressMatrix
-from rrompy.utilities.base.types import (Np1D, List, paramVal, paramList,
+from rrompy.utilities.base.types import (Np1D, Np2D, List, paramVal, paramList,
sampList)
from rrompy.utilities.base import verbosityManager as vbMng, freepar as fp
from rrompy.utilities.exception_manager import RROMPyException, RROMPyWarning
from rrompy.parameter import (checkParameter, checkParameterList,
emptyParameterList)
from rrompy.sampling import sampleList
__all__ = ['TrainedModelRational']
class TrainedModelRational(TrainedModel):
"""
ROM approximant evaluation for rational approximant.
Attributes:
Data: dictionary with all that can be pickled.
"""
def compress(self, collapse : bool = False, tol : float = 0., *args,
**kwargs):
if not collapse and tol <= 0.: return
RMat = self.data.projMat
if not collapse:
if hasattr(self.data, "_compressTol"):
RROMPyWarning(("Recompressing already compressed model is "
"ineffective. Aborting."))
return
self.data.projMat, RMat, _ = compressMatrix(RMat, tol, *args,
**kwargs)
self.data.P.postmultiplyTensorize(RMat.T)
super().compress(collapse, tol)
def centerNormalize(self, mu : paramList = [],
mu0 : paramVal = None) -> paramList:
"""
Compute normalized parameter to be plugged into approximant.
Args:
mu: Parameter(s) 1.
mu0: Parameter(s) 2. If None, set to self.data.mu0.
Returns:
Normalized parameter.
"""
mu = checkParameterList(mu, self.data.npar)[0]
if mu0 is None: mu0 = self.data.mu0
rad = ((mu ** self.data.rescalingExp - mu0 ** self.data.rescalingExp)
/ self.data.scaleFactor)
return rad
def getPVal(self, mu : paramList = []) -> sampList:
"""
Evaluate rational numerator at arbitrary parameter.
Args:
mu: Target parameter.
"""
mu = checkParameterList(mu, self.data.npar)[0]
vbMng(self, "INIT", "Evaluating numerator at mu = {}.".format(mu), 17)
muCenter = self.centerNormalize(mu)
p = sampleList(self.data.P(muCenter))
vbMng(self, "DEL", "Done evaluating numerator.", 17)
return p
def getQVal(self, mu:Np1D, der : List[int] = None,
scl : Np1D = None) -> Np1D:
"""
Evaluate rational denominator at arbitrary parameter.
Args:
mu: Target parameter.
der(optional): Derivatives to take before evaluation.
"""
mu = checkParameterList(mu, self.data.npar)[0]
vbMng(self, "INIT", "Evaluating denominator at mu = {}.".format(mu),
17)
muCenter = self.centerNormalize(mu)
q = self.data.Q(muCenter, der, scl)
vbMng(self, "DEL", "Done evaluating denominator.", 17)
return q
def getApproxReduced(self, mu : paramList = []) -> sampList:
"""
Evaluate reduced representation of approximant at arbitrary parameter.
Args:
mu: Target parameter.
"""
mu = checkParameterList(mu, self.data.npar)[0]
if (not hasattr(self, "lastSolvedApproxReduced")
or self.lastSolvedApproxReduced != mu):
vbMng(self, "INIT",
"Evaluating approximant at mu = {}.".format(mu), 12)
QV = self.getQVal(mu)
QVzero = np.where(QV == 0.)[0]
if len(QVzero) > 0:
RROMPyWarning(("Adjusting approximation to avoid division by "
"numerically zero denominator."))
QV[QVzero] = np.finfo(np.complex).eps / (1.
+ self.data.Q.deg[0])
self.uApproxReduced = self.getPVal(mu) / QV
vbMng(self, "DEL", "Done evaluating approximant.", 12)
self.lastSolvedApproxReduced = mu
return self.uApproxReduced
def getPoles(self, *args, **kwargs) -> Np1D:
"""
Obtain approximant poles.
Returns:
Numpy complex vector of poles.
"""
if len(args) + len(kwargs) > 1:
raise RROMPyException(("Wrong number of parameters passed. "
"Only 1 available."))
elif len(args) + len(kwargs) == 1:
if len(args) == 1:
mVals = args[0]
else:
mVals = kwargs["marginalVals"]
if not hasattr(mVals, "__len__"): mVals = [mVals]
mVals = list(mVals)
else:
mVals = [fp]
try:
rDim = mVals.index(fp)
if rDim < len(mVals) - 1 and fp in mVals[rDim + 1 :]:
raise
except:
raise RROMPyException(("Exactly 1 'freepar' entry in "
"marginalVals must be provided."))
mVals[rDim] = self.data.mu0(rDim)
mVals = self.centerNormalize(checkParameter(mVals, len(mVals)))
mVals = list(mVals.data.flatten())
mVals[rDim] = fp
return np.power(self.data.mu0(rDim) ** self.data.rescalingExp[rDim]
+ self.data.scaleFactor[rDim] * self.data.Q.roots(mVals),
1. / self.data.rescalingExp[rDim])
- def getResidues(self, *args, **kwargs) -> Np1D:
+ def getResidues(self, *args, **kwargs) -> Np2D:
"""
Obtain approximant residues.
Returns:
Numpy matrix with residues as columns.
"""
pls = self.getPoles(*args, **kwargs)
if len(args) == 1:
mVals = args[0]
elif len(args) == 0:
mVals = [None]
else:
mVals = kwargs["marginalVals"]
if not hasattr(mVals, "__len__"): mVals = [mVals]
mVals = list(mVals)
rDim = mVals.index(fp)
poles = emptyParameterList()
poles.reset((len(pls), self.data.npar), dtype = pls.dtype)
for k, pl in enumerate(pls):
poles[k] = mVals
poles.data[k, rDim] = pl
QV = self.getQVal(poles, list(1 * (np.arange(self.data.npar) == rDim)))
QVzero = np.where(QV == 0.)[0]
if len(QVzero) > 0:
RROMPyWarning(("Adjusting residuals to avoid division by "
"numerically zero denominator."))
QV[QVzero] = np.finfo(np.complex).eps / (1. + self.data.Q.deg[0])
Res = self.getPVal(poles).data
if not self.data._collapsed:
Res = self.data.projMat[:, : Res.shape[0]].dot(Res)
res = Res / QV
- return pls, res
+ return pls, res.T