diff --git a/rrompy/reduction_methods/pivoted/trained_model/trained_model_pivoted_rational_nomatch.py b/rrompy/reduction_methods/pivoted/trained_model/trained_model_pivoted_rational_nomatch.py
index 8045aca..1dc72ae 100644
--- a/rrompy/reduction_methods/pivoted/trained_model/trained_model_pivoted_rational_nomatch.py
+++ b/rrompy/reduction_methods/pivoted/trained_model/trained_model_pivoted_rational_nomatch.py
@@ -1,350 +1,350 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
import numpy as np
from copy import deepcopy as copy
from scipy.special import factorial as fact
from itertools import combinations
from rrompy.reduction_methods.standard.trained_model.trained_model_rational \
import TrainedModelRational
from rrompy.utilities.base.types import (Np1D, Np2D, List, ListAny, paramVal,
paramList, sampList)
from rrompy.utilities.base import verbosityManager as vbMng, freepar as fp
from rrompy.utilities.numerical import dot
from rrompy.utilities.numerical.compress_matrix import compressMatrix
from rrompy.utilities.numerical.point_matching import potential
from rrompy.utilities.poly_fitting.heaviside import (rational2heaviside,
HeavisideInterpolator as HI)
from rrompy.utilities.poly_fitting.nearest_neighbor import (
NearestNeighborInterpolator as NNI)
from rrompy.utilities.exception_manager import (RROMPyException, RROMPyAssert,
RROMPyWarning)
from rrompy.parameter import checkParameterList
from rrompy.sampling import sampleList, emptySampleList
__all__ = ['TrainedModelPivotedRationalNoMatch']
class TrainedModelPivotedRationalNoMatch(TrainedModelRational):
"""
ROM approximant evaluation for pivoted approximants based on interpolation
of rational approximants (without pole matching).
Attributes:
Data: dictionary with all that can be pickled.
"""
def checkParameterListPivot(self, mu:paramList,
check_if_single : bool = False) -> paramList:
return checkParameterList(mu, self.data.nparPivot, check_if_single)
def checkParameterListMarginal(self, mu:paramList,
check_if_single : bool = False) -> paramList:
return checkParameterList(mu, self.data.nparMarginal, check_if_single)
def compress(self, collapse : bool = False, tol : float = 0., *args,
**kwargs):
if not collapse and tol <= 0.: return
RMat = self.data.projMat
if not collapse:
if hasattr(self.data, "_compressTol"):
RROMPyWarning(("Recompressing already compressed model is "
"ineffective. Aborting."))
return
self.data.projMat, RMat, _ = compressMatrix(RMat, tol, *args,
**kwargs)
for obj, suppj in zip(self.data.HIs, self.data.Psupp):
obj.postmultiplyTensorize(RMat.T[suppj : suppj + obj.shape[0]])
if hasattr(self, "_HIsExcl"):
for obj, suppj in zip(self._HIsExcl, self.data.Psupp):
obj.postmultiplyTensorize(RMat.T[suppj : suppj + obj.shape[0]])
if hasattr(self.data, "Ps"):
for obj, suppj in zip(self.data.Ps, self.data.Psupp):
obj.postmultiplyTensorize(RMat.T[suppj : suppj + obj.shape[0]])
if hasattr(self, "_PsExcl"):
for obj, suppj in zip(self._PsExcl, self._PsuppExcl):
obj.postmultiplyTensorize(RMat.T[suppj : suppj + obj.shape[0]])
if hasattr(self.data, "coeffsEff"):
for j in range(len(self.data.coeffsEff)):
self.data.coeffsEff[j] = dot(self.data.coeffsEff[j], RMat.T)
if hasattr(self, "_HIsExcl") or hasattr(self, "_PsExcl"):
self._PsuppExcl = [0] * len(self._PsuppExcl)
self.data.Psupp = [0] * len(self.data.Psupp)
super(TrainedModelRational, self).compress(collapse, tol)
def centerNormalizePivot(self, mu : paramList = [],
mu0 : paramVal = None) -> paramList:
"""
Compute normalized parameter to be plugged into approximant.
Args:
mu: Parameter(s) 1.
mu0: Parameter(s) 2. If None, set to self.data.mu0Pivot.
Returns:
Normalized parameter.
"""
mu = self.checkParameterListPivot(mu)
if mu0 is None:
mu0 = self.checkParameterListPivot(
self.data.mu0(0, self.data.directionPivot))
return (self.mapParameterList(mu, idx = self.data.directionPivot)
- self.mapParameterList(mu0, idx = self.data.directionPivot)
) / [self.data.scaleFactor[x] for x in self.data.directionPivot]
def setupMarginalInterp(self, interpPars:ListAny):
- self.data.NN = NNI()
- self.data.NN.setupByInterpolation(self.data.musMarginal,
+ self.data.marginalInterp = NNI()
+ self.data.marginalInterp.setupByInterpolation(self.data.musMarginal,
np.arange(len(self.data.musMarginal)),
1, *interpPars)
def updateEffectiveSamples(self, exclude:List[int], *args, **kwargs):
if hasattr(self, "_idxExcl"):
for j, excl in enumerate(self._idxExcl):
self.data.musMarginal.insert(self._musMExcl[j], excl)
self.data.HIs.insert(excl, self._HIsExcl[j])
self.data.Ps.insert(excl, self._PsExcl[j])
self.data.Qs.insert(excl, self._QsExcl[j])
self.data.Psupp.insert(excl, self._PsuppExcl[j])
self._idxExcl, self._musMExcl = list(np.sort(exclude)), []
self._HIsExcl, self._PsExcl, self._QsExcl = [], [], []
self._PsuppExcl = []
for excl in self._idxExcl[::-1]:
self._musMExcl = [self.data.musMarginal[excl]] + self._musMExcl
self.data.musMarginal.pop(excl)
self._HIsExcl = [self.data.HIs.pop(excl)] + self._HIsExcl
self._PsExcl = [self.data.Ps.pop(excl)] + self._PsExcl
self._QsExcl = [self.data.Qs.pop(excl)] + self._QsExcl
self._PsuppExcl = [self.data.Psupp.pop(excl)] + self._PsuppExcl
poles = [hi.poles for hi in self.data.HIs]
coeffs = [hi.coeffs for hi in self.data.HIs]
self.initializeFromLists(poles, coeffs, self.data.Psupp,
self.data.HIs[0].polybasis, *args, **kwargs)
def initializeFromLists(self, poles:ListAny, coeffs:ListAny, supps:ListAny,
basis:str, *args, **kwargs):
"""Initialize Heaviside representation."""
self.data.HIs = []
for pls, cfs in zip(poles, coeffs):
hsi = HI()
hsi.poles = pls
if len(cfs) == len(pls):
cfs = np.pad(cfs, ((0, 1), (0, 0)), "constant")
hsi.coeffs = cfs
hsi.npar = 1
hsi.polybasis = basis
self.data.HIs += [hsi]
def initializeFromRational(self, *args, **kwargs):
"""Initialize Heaviside representation."""
RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters")
poles, coeffs = [], []
for Q, P in zip(self.data.Qs, self.data.Ps):
cfs, pls, basis = rational2heaviside(P, Q)
poles += [pls]
coeffs += [cfs]
self.initializeFromLists(poles, coeffs, self.data.Psupp, basis, *args,
**kwargs)
def recompressByCutOff(self, tol:float, foci:List[np.complex],
ground:float) -> str:
gLocPoles = [np.logical_and(np.logical_not(np.isinf(hi.poles)),
potential(hi.poles, foci) - ground <= tol * ground)
for hi in self.data.HIs]
nRemPole = np.sum([np.sum(np.logical_not(gLPi)) for gLPi in gLocPoles])
if nRemPole == 0: return " No poles erased."
for hi, gLocPolesi in zip(self.data.HIs, gLocPoles):
N = len(hi.poles)
for j, goodj in enumerate(gLocPolesi):
if not goodj and not np.isinf(hi.poles[j]):
hi.coeffs[N, :] -= hi.coeffs[j, :] / hi.poles[j]
hi.poles = hi.poles[gLocPolesi]
gLocCoeffi = np.append(gLocPolesi,
np.ones(len(hi.coeffs) - N, dtype = bool))
hi.coeffs = hi.coeffs[gLocCoeffi, :]
return " Erased {} pole{}.".format(nRemPole, "s" * (nRemPole != 1))
def interpolateMarginalInterpolator(self, mu : paramList = []) -> ListAny:
"""Obtain interpolated approximant interpolator."""
mu = self.checkParameterListMarginal(mu)
vbMng(self, "INIT", "Finding nearest neighbor to mu = {}.".format(mu),
95)
his = []
- intM = np.array(self.data.NN(mu), dtype = int)
+ intM = np.array(self.data.marginalInterp(mu), dtype = int)
for j in range(len(mu)):
i = intM[j]
his += [HI()]
his[-1].poles = copy(self.data.HIs[i].poles)
his[-1].coeffs = copy(self.data.HIs[i].coeffs)
his[-1].npar = 1
his[-1].polybasis = self.data.HIs[0].polybasis
if not self.data._collapsed:
his[-1].pad(self.data.Psupp[i],
self.data.projMat.shape[1] - self.data.Psupp[i]
- his[-1].shape[0])
vbMng(self, "DEL", "Done finding nearest neighbor.", 95)
return his
def interpolateMarginalPoles(self, mu : paramList = []) -> ListAny:
"""Obtain interpolated approximant poles."""
interps = self.interpolateMarginalInterpolator(mu)
return [interp.poles for interp in interps]
def interpolateMarginalCoeffs(self, mu : paramList = []) -> ListAny:
"""Obtain interpolated approximant poles."""
interps = self.interpolateMarginalInterpolator(mu)
return [interp.coeffs for interp in interps]
def getApproxReduced(self, mu : paramList = []) -> sampList:
"""
Evaluate reduced representation of approximant at arbitrary parameter.
Args:
mu: Target parameter.
"""
RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters")
mu = self.checkParameterList(mu)
if (not hasattr(self, "lastSolvedApproxReduced")
or self.lastSolvedApproxReduced != mu):
vbMng(self, "INIT",
"Evaluating approximant at mu = {}.".format(mu), 12)
muP = self.centerNormalizePivot(mu.data[:,
self.data.directionPivot])
muM = mu.data[:, self.data.directionMarginal]
his = self.interpolateMarginalInterpolator(muM)
for i, (mP, hi) in enumerate(zip(muP, his)):
uAppR = hi(mP)[:, 0]
if i == 0:
uApproxR = np.empty((len(uAppR), len(mu)),
dtype = uAppR.dtype)
uApproxR[:, i] = uAppR
self.uApproxReduced = sampleList(uApproxR)
vbMng(self, "DEL", "Done evaluating approximant.", 12)
self.lastSolvedApproxReduced = mu
return self.uApproxReduced
def getPVal(self, mu : paramList = []) -> sampList:
"""
Evaluate rational numerator at arbitrary parameter.
Args:
mu: Target parameter.
"""
RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters")
mu = self.checkParameterList(mu)
p = emptySampleList()
muP = self.centerNormalizePivot(mu.data[:, self.data.directionPivot])
muM = mu.data[:, self.data.directionMarginal]
his = self.interpolateMarginalInterpolator(muM)
for i, (mP, hi) in enumerate(zip(muP, his)):
Pval = hi(mP) * np.prod(mP[0] - hi.poles)
if i == 0: p.reset((len(Pval), len(mu)), dtype = Pval.dtype)
p[i] = Pval
return p
def getQVal(self, mu:Np1D, der : List[int] = None,
scl : Np1D = None) -> Np1D:
"""
Evaluate rational denominator at arbitrary parameter.
Args:
mu: Target parameter.
der(optional): Derivatives to take before evaluation.
"""
RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters")
mu = self.checkParameterList(mu)
muP = self.centerNormalizePivot(mu.data[:, self.data.directionPivot])
muM = mu.data[:, self.data.directionMarginal]
if der is None:
derP, derM = 0, [0]
else:
derP = der[self.data.directionPivot[0]]
derM = [der[x] for x in self.data.directionMarginal]
if np.any(np.array(derM) != 0):
raise RROMPyException(("Derivatives of Q with respect to marginal "
"parameters not allowed."))
sclP = 1 if scl is None else scl[self.data.directionPivot[0]]
derVal = np.zeros(len(mu), dtype = np.complex)
pls = self.interpolateMarginalPoles(muM)
for i, (mP, pl) in enumerate(zip(muP, pls)):
N = len(pl)
if derP == N: derVal[i] = 1.
elif derP >= 0 and derP < N:
plDist = muP[0] - pl
for terms in combinations(np.arange(N), N - derP):
derVal[i] += np.prod(plDist[list(terms)], axis = 1)
return sclP ** derP * fact(derP) * derVal
def getPoles(self, *args, **kwargs) -> Np1D:
"""
Obtain approximant poles.
Returns:
Numpy complex vector of poles.
"""
RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters")
if len(args) + len(kwargs) > 1:
raise RROMPyException(("Wrong number of parameters passed. "
"Only 1 available."))
elif len(args) + len(kwargs) == 1:
if len(args) == 1:
mVals = args[0]
else:
mVals = kwargs["marginalVals"]
if not hasattr(mVals, "__len__"): mVals = [mVals]
mVals = list(mVals)
else:
mVals = [fp]
try:
rDim = mVals.index(fp)
if rDim < len(mVals) - 1 and fp in mVals[rDim + 1 :]:
raise
except:
raise RROMPyException(("Exactly 1 'freepar' entry in "
"marginalVals must be provided."))
if rDim != self.data.directionPivot[0]:
raise RROMPyException(("'freepar' entry in marginalVals must "
"coincide with pivot direction."))
mVals[rDim] = self.data.mu0(rDim)
mMarg = [mVals[j] for j in range(len(mVals)) if j != rDim]
roots = (self.data.scaleFactor[rDim]
* self.interpolateMarginalPoles(mMarg)[0])
return self.mapParameterList(self.mapParameterList(self.data.mu0(rDim),
idx = [rDim])(0, 0)
+ roots, "B", [rDim])(0)
def getResidues(self, *args, **kwargs) -> Np2D:
"""
Obtain approximant residues.
Returns:
Numpy matrix with residues as columns.
"""
pls = self.getPoles(*args, **kwargs)
if len(args) == 1:
mVals = args[0]
elif len(args) == 0:
mVals = [None]
else:
mVals = kwargs["marginalVals"]
if not hasattr(mVals, "__len__"): mVals = [mVals]
mVals = list(mVals)
rDim = mVals.index(fp)
mMarg = [mVals[j] for j in range(len(mVals)) if j != rDim]
res = self.interpolateMarginalCoeffs(mMarg)[0][: len(pls), :]
if not self.data._collapsed: res = self.data.projMat.dot(res.T).T
return pls, res
diff --git a/rrompy/utilities/base/decorators.py b/rrompy/utilities/base/decorators.py
index c0cb92a..89bea4d 100644
--- a/rrompy/utilities/base/decorators.py
+++ b/rrompy/utilities/base/decorators.py
@@ -1,35 +1,86 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
-from numpy import inf
+from numpy import inf, random
+from functools import wraps
-__all__ = ['affine_construct', 'nonaffine_construct', 'threshold']
+__all__ = ['affine_construct', 'nonaffine_construct', 'threshold',
+ 'addWhiteNoise']
def affine_construct(method):
method.is_affine = True
return method
def nonaffine_construct(method):
method.is_affine = False
return method
def threshold(thresh = inf):
def method_with_thresh(method):
method.threshold = thresh
return method
return method_with_thresh
+
+def addWhiteNoise(amplitude : float = 1., seed : int = None,
+ to_solve : bool = True):
+ def add_noise(engine):
+ try:
+ is_engine = "solve" in dir(engine)
+ except:
+ is_engine = False
+ if is_engine: # apply from outside class
+ if to_solve:
+ class engine_with_noise(engine):
+ def solve(self, *args, **kwargs):
+ if not hasattr(self, "rng"):
+ self.rng = random.default_rng(seed)
+ self.__class__.__name__ = (engine.__name__
+ + "+WhiteNoise")
+ sol = super().solve(*args, **kwargs)
+ noise = self.rng.standard_normal(sol.shape)
+ if sol.dtype == complex:
+ noise = noise + 1.j * self.rng.standard_normal(
+ noise.shape)
+ return sol + amplitude * noise
+ else:
+ class engine_with_noise(engine):
+ def b(self, *args, **kwargs):
+ if not hasattr(self, "rng"):
+ self.rng = random.default_rng(seed)
+ self.__class__.__name__ = (engine.__name__
+ + "+WhiteNoise")
+ bb = super().b(*args, **kwargs)
+ noise = self.rng.standard_normal(bb.shape)
+ if bb.dtype == complex:
+ noise = noise + 1.j * self.rng.standard_normal(
+ noise.shape)
+ return bb + amplitude * noise
+ return engine_with_noise
+ # apply from within class
+ @wraps(engine)
+ def solve_with_noise(self, *args, **kwargs):
+ if not hasattr(self, "rng"):
+ self.rng = random.default_rng(seed)
+ self.__class__.__name__ += "+WhiteNoise"
+ sol = engine(self, *args, **kwargs)
+ noise = self.rng.standard_normal(sol.shape)
+ if sol.dtype == complex:
+ noise = noise + 1.j * self.rng.standard_normal(noise.shape)
+ return sol + amplitude * noise
+ return solve_with_noise
+ return add_noise
diff --git a/rrompy/utilities/exception_manager/warning_manager.py b/rrompy/utilities/exception_manager/warning_manager.py
index ec414a2..8be18f3 100644
--- a/rrompy/utilities/exception_manager/warning_manager.py
+++ b/rrompy/utilities/exception_manager/warning_manager.py
@@ -1,35 +1,32 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
import traceback as tb
__all__ = ['RROMPyWarning']
def RROMPyWarning(msg : str = "", stacklevel : int = 0):
- from rrompy.utilities.base.verbosity_depth import getTimestamp
+ from rrompy.utilities.base.verbosity_depth import verbosityDepth
frameSummary = tb.extract_stack()[- 3 - stacklevel]
- timestamp = getTimestamp()
if frameSummary.name == "": name = ""
else: name = ", within {}".format(frameSummary.name)
- print("{} \x1b[3mWarning at {}:{}{}:\x1b[0m".format(timestamp,
- frameSummary.filename,
- frameSummary.lineno,
- name))
- print("> \x1b[31m{}\x1b[0m".format(frameSummary.line))
- if len(msg) > 0:
- print(" \x1b[3m{}\x1b[0m\n".format(msg))
+ verbosityDepth("INIT", " \x1b[3mWarning at {}:{}{}:\x1b[0m".format(
+ frameSummary.filename, frameSummary.lineno, name))
+ verbosityDepth("DEL" * (len(msg) == 0) + "MAIN" * (len(msg) > 0),
+ " > \x1b[31m{}\x1b[0m".format(frameSummary.line))
+ if len(msg) > 0: verbosityDepth("DEL", " \x1b[3m{}\x1b[0m".format(msg))
diff --git a/rrompy/utilities/parallel/base.py b/rrompy/utilities/parallel/base.py
index d8b2e4c..476ed2d 100644
--- a/rrompy/utilities/parallel/base.py
+++ b/rrompy/utilities/parallel/base.py
@@ -1,67 +1,67 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
from rrompy.utilities.base.verbosity_depth import updateVerbosityCheckpoint
try:
from mpi4py import MPI
_MPI_IS_LOADED = True
COMM, SELF = MPI.COMM_WORLD, MPI.COMM_SELF
except ImportError:
_MPI_IS_LOADED = False
COMM, SELF = None, None
__all__ = ['_MPI_IS_LOADED', 'COMM', 'SELF', 'poolRank', 'poolSize',
'masterCore', 'updateSerialIndex', 'forceSerialExecution',
'allowParallelExecution', 'forcedSerial']
def poolRank() -> int:
if _MPI_IS_LOADED: return COMM.Get_rank()
return 0
def poolSize() -> int:
if _MPI_IS_LOADED: return COMM.Get_size()
return 1
def masterCore() -> bool:
return forcedSerial() or poolRank() == 0
def updateSerialIndex(n : int = 0, delta : bool = True):
global RROMPy_force_serial
if "RROMPy_force_serial" not in globals(): RROMPy_force_serial = 0
if delta: RROMPy_force_serial += n
else: RROMPy_force_serial = n
if RROMPy_force_serial <= 0: del RROMPy_force_serial
def forceSerialExecution(verbosity : bool = False):
updateSerialIndex(1)
- if verbosity: updateVerbosityCheckpoint(1)
+ if verbosity and poolSize() > 1: updateVerbosityCheckpoint(1)
def allowParallelExecution(verbosity : bool = False):
updateSerialIndex(-1)
- if verbosity:
+ if verbosity and poolSize() > 1:
buffer = updateVerbosityCheckpoint(-1)
if buffer is not None:
msg = COMM.gather(buffer, root = 0)
if masterCore():
out = ""
for ms in msg: out += ms
print(out, end = "", flush = True)
def forcedSerial():
global RROMPy_force_serial
return "RROMPy_force_serial" in globals() and RROMPy_force_serial > 0