diff --git a/rrompy/reduction_methods/pivoted/trained_model/trained_model_pivoted_rational_nomatch.py b/rrompy/reduction_methods/pivoted/trained_model/trained_model_pivoted_rational_nomatch.py index 8045aca..1dc72ae 100644 --- a/rrompy/reduction_methods/pivoted/trained_model/trained_model_pivoted_rational_nomatch.py +++ b/rrompy/reduction_methods/pivoted/trained_model/trained_model_pivoted_rational_nomatch.py @@ -1,350 +1,350 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import numpy as np from copy import deepcopy as copy from scipy.special import factorial as fact from itertools import combinations from rrompy.reduction_methods.standard.trained_model.trained_model_rational \ import TrainedModelRational from rrompy.utilities.base.types import (Np1D, Np2D, List, ListAny, paramVal, paramList, sampList) from rrompy.utilities.base import verbosityManager as vbMng, freepar as fp from rrompy.utilities.numerical import dot from rrompy.utilities.numerical.compress_matrix import compressMatrix from rrompy.utilities.numerical.point_matching import potential from rrompy.utilities.poly_fitting.heaviside import (rational2heaviside, HeavisideInterpolator as HI) from rrompy.utilities.poly_fitting.nearest_neighbor import ( NearestNeighborInterpolator as NNI) from rrompy.utilities.exception_manager import (RROMPyException, RROMPyAssert, RROMPyWarning) from rrompy.parameter import checkParameterList from rrompy.sampling import sampleList, emptySampleList __all__ = ['TrainedModelPivotedRationalNoMatch'] class TrainedModelPivotedRationalNoMatch(TrainedModelRational): """ ROM approximant evaluation for pivoted approximants based on interpolation of rational approximants (without pole matching). Attributes: Data: dictionary with all that can be pickled. """ def checkParameterListPivot(self, mu:paramList, check_if_single : bool = False) -> paramList: return checkParameterList(mu, self.data.nparPivot, check_if_single) def checkParameterListMarginal(self, mu:paramList, check_if_single : bool = False) -> paramList: return checkParameterList(mu, self.data.nparMarginal, check_if_single) def compress(self, collapse : bool = False, tol : float = 0., *args, **kwargs): if not collapse and tol <= 0.: return RMat = self.data.projMat if not collapse: if hasattr(self.data, "_compressTol"): RROMPyWarning(("Recompressing already compressed model is " "ineffective. Aborting.")) return self.data.projMat, RMat, _ = compressMatrix(RMat, tol, *args, **kwargs) for obj, suppj in zip(self.data.HIs, self.data.Psupp): obj.postmultiplyTensorize(RMat.T[suppj : suppj + obj.shape[0]]) if hasattr(self, "_HIsExcl"): for obj, suppj in zip(self._HIsExcl, self.data.Psupp): obj.postmultiplyTensorize(RMat.T[suppj : suppj + obj.shape[0]]) if hasattr(self.data, "Ps"): for obj, suppj in zip(self.data.Ps, self.data.Psupp): obj.postmultiplyTensorize(RMat.T[suppj : suppj + obj.shape[0]]) if hasattr(self, "_PsExcl"): for obj, suppj in zip(self._PsExcl, self._PsuppExcl): obj.postmultiplyTensorize(RMat.T[suppj : suppj + obj.shape[0]]) if hasattr(self.data, "coeffsEff"): for j in range(len(self.data.coeffsEff)): self.data.coeffsEff[j] = dot(self.data.coeffsEff[j], RMat.T) if hasattr(self, "_HIsExcl") or hasattr(self, "_PsExcl"): self._PsuppExcl = [0] * len(self._PsuppExcl) self.data.Psupp = [0] * len(self.data.Psupp) super(TrainedModelRational, self).compress(collapse, tol) def centerNormalizePivot(self, mu : paramList = [], mu0 : paramVal = None) -> paramList: """ Compute normalized parameter to be plugged into approximant. Args: mu: Parameter(s) 1. mu0: Parameter(s) 2. If None, set to self.data.mu0Pivot. Returns: Normalized parameter. """ mu = self.checkParameterListPivot(mu) if mu0 is None: mu0 = self.checkParameterListPivot( self.data.mu0(0, self.data.directionPivot)) return (self.mapParameterList(mu, idx = self.data.directionPivot) - self.mapParameterList(mu0, idx = self.data.directionPivot) ) / [self.data.scaleFactor[x] for x in self.data.directionPivot] def setupMarginalInterp(self, interpPars:ListAny): - self.data.NN = NNI() - self.data.NN.setupByInterpolation(self.data.musMarginal, + self.data.marginalInterp = NNI() + self.data.marginalInterp.setupByInterpolation(self.data.musMarginal, np.arange(len(self.data.musMarginal)), 1, *interpPars) def updateEffectiveSamples(self, exclude:List[int], *args, **kwargs): if hasattr(self, "_idxExcl"): for j, excl in enumerate(self._idxExcl): self.data.musMarginal.insert(self._musMExcl[j], excl) self.data.HIs.insert(excl, self._HIsExcl[j]) self.data.Ps.insert(excl, self._PsExcl[j]) self.data.Qs.insert(excl, self._QsExcl[j]) self.data.Psupp.insert(excl, self._PsuppExcl[j]) self._idxExcl, self._musMExcl = list(np.sort(exclude)), [] self._HIsExcl, self._PsExcl, self._QsExcl = [], [], [] self._PsuppExcl = [] for excl in self._idxExcl[::-1]: self._musMExcl = [self.data.musMarginal[excl]] + self._musMExcl self.data.musMarginal.pop(excl) self._HIsExcl = [self.data.HIs.pop(excl)] + self._HIsExcl self._PsExcl = [self.data.Ps.pop(excl)] + self._PsExcl self._QsExcl = [self.data.Qs.pop(excl)] + self._QsExcl self._PsuppExcl = [self.data.Psupp.pop(excl)] + self._PsuppExcl poles = [hi.poles for hi in self.data.HIs] coeffs = [hi.coeffs for hi in self.data.HIs] self.initializeFromLists(poles, coeffs, self.data.Psupp, self.data.HIs[0].polybasis, *args, **kwargs) def initializeFromLists(self, poles:ListAny, coeffs:ListAny, supps:ListAny, basis:str, *args, **kwargs): """Initialize Heaviside representation.""" self.data.HIs = [] for pls, cfs in zip(poles, coeffs): hsi = HI() hsi.poles = pls if len(cfs) == len(pls): cfs = np.pad(cfs, ((0, 1), (0, 0)), "constant") hsi.coeffs = cfs hsi.npar = 1 hsi.polybasis = basis self.data.HIs += [hsi] def initializeFromRational(self, *args, **kwargs): """Initialize Heaviside representation.""" RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters") poles, coeffs = [], [] for Q, P in zip(self.data.Qs, self.data.Ps): cfs, pls, basis = rational2heaviside(P, Q) poles += [pls] coeffs += [cfs] self.initializeFromLists(poles, coeffs, self.data.Psupp, basis, *args, **kwargs) def recompressByCutOff(self, tol:float, foci:List[np.complex], ground:float) -> str: gLocPoles = [np.logical_and(np.logical_not(np.isinf(hi.poles)), potential(hi.poles, foci) - ground <= tol * ground) for hi in self.data.HIs] nRemPole = np.sum([np.sum(np.logical_not(gLPi)) for gLPi in gLocPoles]) if nRemPole == 0: return " No poles erased." for hi, gLocPolesi in zip(self.data.HIs, gLocPoles): N = len(hi.poles) for j, goodj in enumerate(gLocPolesi): if not goodj and not np.isinf(hi.poles[j]): hi.coeffs[N, :] -= hi.coeffs[j, :] / hi.poles[j] hi.poles = hi.poles[gLocPolesi] gLocCoeffi = np.append(gLocPolesi, np.ones(len(hi.coeffs) - N, dtype = bool)) hi.coeffs = hi.coeffs[gLocCoeffi, :] return " Erased {} pole{}.".format(nRemPole, "s" * (nRemPole != 1)) def interpolateMarginalInterpolator(self, mu : paramList = []) -> ListAny: """Obtain interpolated approximant interpolator.""" mu = self.checkParameterListMarginal(mu) vbMng(self, "INIT", "Finding nearest neighbor to mu = {}.".format(mu), 95) his = [] - intM = np.array(self.data.NN(mu), dtype = int) + intM = np.array(self.data.marginalInterp(mu), dtype = int) for j in range(len(mu)): i = intM[j] his += [HI()] his[-1].poles = copy(self.data.HIs[i].poles) his[-1].coeffs = copy(self.data.HIs[i].coeffs) his[-1].npar = 1 his[-1].polybasis = self.data.HIs[0].polybasis if not self.data._collapsed: his[-1].pad(self.data.Psupp[i], self.data.projMat.shape[1] - self.data.Psupp[i] - his[-1].shape[0]) vbMng(self, "DEL", "Done finding nearest neighbor.", 95) return his def interpolateMarginalPoles(self, mu : paramList = []) -> ListAny: """Obtain interpolated approximant poles.""" interps = self.interpolateMarginalInterpolator(mu) return [interp.poles for interp in interps] def interpolateMarginalCoeffs(self, mu : paramList = []) -> ListAny: """Obtain interpolated approximant poles.""" interps = self.interpolateMarginalInterpolator(mu) return [interp.coeffs for interp in interps] def getApproxReduced(self, mu : paramList = []) -> sampList: """ Evaluate reduced representation of approximant at arbitrary parameter. Args: mu: Target parameter. """ RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters") mu = self.checkParameterList(mu) if (not hasattr(self, "lastSolvedApproxReduced") or self.lastSolvedApproxReduced != mu): vbMng(self, "INIT", "Evaluating approximant at mu = {}.".format(mu), 12) muP = self.centerNormalizePivot(mu.data[:, self.data.directionPivot]) muM = mu.data[:, self.data.directionMarginal] his = self.interpolateMarginalInterpolator(muM) for i, (mP, hi) in enumerate(zip(muP, his)): uAppR = hi(mP)[:, 0] if i == 0: uApproxR = np.empty((len(uAppR), len(mu)), dtype = uAppR.dtype) uApproxR[:, i] = uAppR self.uApproxReduced = sampleList(uApproxR) vbMng(self, "DEL", "Done evaluating approximant.", 12) self.lastSolvedApproxReduced = mu return self.uApproxReduced def getPVal(self, mu : paramList = []) -> sampList: """ Evaluate rational numerator at arbitrary parameter. Args: mu: Target parameter. """ RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters") mu = self.checkParameterList(mu) p = emptySampleList() muP = self.centerNormalizePivot(mu.data[:, self.data.directionPivot]) muM = mu.data[:, self.data.directionMarginal] his = self.interpolateMarginalInterpolator(muM) for i, (mP, hi) in enumerate(zip(muP, his)): Pval = hi(mP) * np.prod(mP[0] - hi.poles) if i == 0: p.reset((len(Pval), len(mu)), dtype = Pval.dtype) p[i] = Pval return p def getQVal(self, mu:Np1D, der : List[int] = None, scl : Np1D = None) -> Np1D: """ Evaluate rational denominator at arbitrary parameter. Args: mu: Target parameter. der(optional): Derivatives to take before evaluation. """ RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters") mu = self.checkParameterList(mu) muP = self.centerNormalizePivot(mu.data[:, self.data.directionPivot]) muM = mu.data[:, self.data.directionMarginal] if der is None: derP, derM = 0, [0] else: derP = der[self.data.directionPivot[0]] derM = [der[x] for x in self.data.directionMarginal] if np.any(np.array(derM) != 0): raise RROMPyException(("Derivatives of Q with respect to marginal " "parameters not allowed.")) sclP = 1 if scl is None else scl[self.data.directionPivot[0]] derVal = np.zeros(len(mu), dtype = np.complex) pls = self.interpolateMarginalPoles(muM) for i, (mP, pl) in enumerate(zip(muP, pls)): N = len(pl) if derP == N: derVal[i] = 1. elif derP >= 0 and derP < N: plDist = muP[0] - pl for terms in combinations(np.arange(N), N - derP): derVal[i] += np.prod(plDist[list(terms)], axis = 1) return sclP ** derP * fact(derP) * derVal def getPoles(self, *args, **kwargs) -> Np1D: """ Obtain approximant poles. Returns: Numpy complex vector of poles. """ RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters") if len(args) + len(kwargs) > 1: raise RROMPyException(("Wrong number of parameters passed. " "Only 1 available.")) elif len(args) + len(kwargs) == 1: if len(args) == 1: mVals = args[0] else: mVals = kwargs["marginalVals"] if not hasattr(mVals, "__len__"): mVals = [mVals] mVals = list(mVals) else: mVals = [fp] try: rDim = mVals.index(fp) if rDim < len(mVals) - 1 and fp in mVals[rDim + 1 :]: raise except: raise RROMPyException(("Exactly 1 'freepar' entry in " "marginalVals must be provided.")) if rDim != self.data.directionPivot[0]: raise RROMPyException(("'freepar' entry in marginalVals must " "coincide with pivot direction.")) mVals[rDim] = self.data.mu0(rDim) mMarg = [mVals[j] for j in range(len(mVals)) if j != rDim] roots = (self.data.scaleFactor[rDim] * self.interpolateMarginalPoles(mMarg)[0]) return self.mapParameterList(self.mapParameterList(self.data.mu0(rDim), idx = [rDim])(0, 0) + roots, "B", [rDim])(0) def getResidues(self, *args, **kwargs) -> Np2D: """ Obtain approximant residues. Returns: Numpy matrix with residues as columns. """ pls = self.getPoles(*args, **kwargs) if len(args) == 1: mVals = args[0] elif len(args) == 0: mVals = [None] else: mVals = kwargs["marginalVals"] if not hasattr(mVals, "__len__"): mVals = [mVals] mVals = list(mVals) rDim = mVals.index(fp) mMarg = [mVals[j] for j in range(len(mVals)) if j != rDim] res = self.interpolateMarginalCoeffs(mMarg)[0][: len(pls), :] if not self.data._collapsed: res = self.data.projMat.dot(res.T).T return pls, res diff --git a/rrompy/utilities/base/decorators.py b/rrompy/utilities/base/decorators.py index c0cb92a..89bea4d 100644 --- a/rrompy/utilities/base/decorators.py +++ b/rrompy/utilities/base/decorators.py @@ -1,35 +1,86 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # -from numpy import inf +from numpy import inf, random +from functools import wraps -__all__ = ['affine_construct', 'nonaffine_construct', 'threshold'] +__all__ = ['affine_construct', 'nonaffine_construct', 'threshold', + 'addWhiteNoise'] def affine_construct(method): method.is_affine = True return method def nonaffine_construct(method): method.is_affine = False return method def threshold(thresh = inf): def method_with_thresh(method): method.threshold = thresh return method return method_with_thresh + +def addWhiteNoise(amplitude : float = 1., seed : int = None, + to_solve : bool = True): + def add_noise(engine): + try: + is_engine = "solve" in dir(engine) + except: + is_engine = False + if is_engine: # apply from outside class + if to_solve: + class engine_with_noise(engine): + def solve(self, *args, **kwargs): + if not hasattr(self, "rng"): + self.rng = random.default_rng(seed) + self.__class__.__name__ = (engine.__name__ + + "+WhiteNoise") + sol = super().solve(*args, **kwargs) + noise = self.rng.standard_normal(sol.shape) + if sol.dtype == complex: + noise = noise + 1.j * self.rng.standard_normal( + noise.shape) + return sol + amplitude * noise + else: + class engine_with_noise(engine): + def b(self, *args, **kwargs): + if not hasattr(self, "rng"): + self.rng = random.default_rng(seed) + self.__class__.__name__ = (engine.__name__ + + "+WhiteNoise") + bb = super().b(*args, **kwargs) + noise = self.rng.standard_normal(bb.shape) + if bb.dtype == complex: + noise = noise + 1.j * self.rng.standard_normal( + noise.shape) + return bb + amplitude * noise + return engine_with_noise + # apply from within class + @wraps(engine) + def solve_with_noise(self, *args, **kwargs): + if not hasattr(self, "rng"): + self.rng = random.default_rng(seed) + self.__class__.__name__ += "+WhiteNoise" + sol = engine(self, *args, **kwargs) + noise = self.rng.standard_normal(sol.shape) + if sol.dtype == complex: + noise = noise + 1.j * self.rng.standard_normal(noise.shape) + return sol + amplitude * noise + return solve_with_noise + return add_noise diff --git a/rrompy/utilities/exception_manager/warning_manager.py b/rrompy/utilities/exception_manager/warning_manager.py index ec414a2..8be18f3 100644 --- a/rrompy/utilities/exception_manager/warning_manager.py +++ b/rrompy/utilities/exception_manager/warning_manager.py @@ -1,35 +1,32 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import traceback as tb __all__ = ['RROMPyWarning'] def RROMPyWarning(msg : str = "", stacklevel : int = 0): - from rrompy.utilities.base.verbosity_depth import getTimestamp + from rrompy.utilities.base.verbosity_depth import verbosityDepth frameSummary = tb.extract_stack()[- 3 - stacklevel] - timestamp = getTimestamp() if frameSummary.name == "": name = "" else: name = ", within {}".format(frameSummary.name) - print("{} \x1b[3mWarning at {}:{}{}:\x1b[0m".format(timestamp, - frameSummary.filename, - frameSummary.lineno, - name)) - print("> \x1b[31m{}\x1b[0m".format(frameSummary.line)) - if len(msg) > 0: - print(" \x1b[3m{}\x1b[0m\n".format(msg)) + verbosityDepth("INIT", " \x1b[3mWarning at {}:{}{}:\x1b[0m".format( + frameSummary.filename, frameSummary.lineno, name)) + verbosityDepth("DEL" * (len(msg) == 0) + "MAIN" * (len(msg) > 0), + " > \x1b[31m{}\x1b[0m".format(frameSummary.line)) + if len(msg) > 0: verbosityDepth("DEL", " \x1b[3m{}\x1b[0m".format(msg)) diff --git a/rrompy/utilities/parallel/base.py b/rrompy/utilities/parallel/base.py index d8b2e4c..476ed2d 100644 --- a/rrompy/utilities/parallel/base.py +++ b/rrompy/utilities/parallel/base.py @@ -1,67 +1,67 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from rrompy.utilities.base.verbosity_depth import updateVerbosityCheckpoint try: from mpi4py import MPI _MPI_IS_LOADED = True COMM, SELF = MPI.COMM_WORLD, MPI.COMM_SELF except ImportError: _MPI_IS_LOADED = False COMM, SELF = None, None __all__ = ['_MPI_IS_LOADED', 'COMM', 'SELF', 'poolRank', 'poolSize', 'masterCore', 'updateSerialIndex', 'forceSerialExecution', 'allowParallelExecution', 'forcedSerial'] def poolRank() -> int: if _MPI_IS_LOADED: return COMM.Get_rank() return 0 def poolSize() -> int: if _MPI_IS_LOADED: return COMM.Get_size() return 1 def masterCore() -> bool: return forcedSerial() or poolRank() == 0 def updateSerialIndex(n : int = 0, delta : bool = True): global RROMPy_force_serial if "RROMPy_force_serial" not in globals(): RROMPy_force_serial = 0 if delta: RROMPy_force_serial += n else: RROMPy_force_serial = n if RROMPy_force_serial <= 0: del RROMPy_force_serial def forceSerialExecution(verbosity : bool = False): updateSerialIndex(1) - if verbosity: updateVerbosityCheckpoint(1) + if verbosity and poolSize() > 1: updateVerbosityCheckpoint(1) def allowParallelExecution(verbosity : bool = False): updateSerialIndex(-1) - if verbosity: + if verbosity and poolSize() > 1: buffer = updateVerbosityCheckpoint(-1) if buffer is not None: msg = COMM.gather(buffer, root = 0) if masterCore(): out = "" for ms in msg: out += ms print(out, end = "", flush = True) def forcedSerial(): global RROMPy_force_serial return "RROMPy_force_serial" in globals() and RROMPy_force_serial > 0