diff --git a/rrompy/hfengines/base/hfengine_base.py b/rrompy/hfengines/base/hfengine_base.py index 628cab9..2a14caa 100644 --- a/rrompy/hfengines/base/hfengine_base.py +++ b/rrompy/hfengines/base/hfengine_base.py @@ -1,367 +1,367 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from abc import abstractmethod import numpy as np import scipy.sparse as scsp from numbers import Number from collections.abc import Iterable from copy import copy as softcopy from rrompy.utilities.base.decorators import (nonaffine_construct, mu_independent) from rrompy.utilities.base.types import (Np1D, Np2D, List, DictAny, paramVal, paramList, sampList) from rrompy.utilities.numerical import solve as tsolve, dot from rrompy.utilities.expression import expressionEvaluator from rrompy.utilities.exception_manager import RROMPyException, RROMPyAssert from rrompy.sampling.sample_list import sampleList from rrompy.parameter import (checkParameter, checkParameterList, parameterList, parameterMap as pMap) from rrompy.solver.linear_solver import setupSolver from rrompy.utilities.parallel import (poolRank, masterCore, listScatter, matrixGatherv, isend, recv) __all__ = ['HFEngineBase'] class HFEngineBase: """Generic solver for parametric problems.""" def __init__(self, verbosity : int = 10, timestamp : bool = True): self.verbosity = verbosity self.timestamp = timestamp self.setSolver("SPSOLVE", {"use_umfpack" : False}) self.npar = 0 self._C = None self.outputNormMatrix = 1. def name(self) -> str: return self.__class__.__name__ def __str__(self) -> str: return self.name() def __repr__(self) -> str: return self.__str__() + " at " + hex(id(self)) def __dir_base__(self): return [x for x in self.__dir__() if x[:2] != "__"] def __deepcopy__(self, memo): return softcopy(self) @property def npar(self): """Value of npar.""" return self._npar @npar.setter def npar(self, npar): nparOld = self._npar if hasattr(self, "_npar") else -1 if npar != nparOld: self.parameterMap = pMap(1., npar) self._npar = npar @property def spacedim(self): return 1 def checkParameter(self, mu:paramVal) -> paramVal: muP = checkParameter(mu, self.npar) if self.npar == 0: muP.reset((1, 0), muP.dtype) return muP def checkParameterList(self, mu:paramList, check_if_single : bool = False) -> paramList: muL = checkParameterList(mu, self.npar, check_if_single) return muL def mapParameterList(self, mu:paramList, direct : str = "F", idx : List[int] = None) -> paramList: if idx is None: idx = np.arange(self.npar) muMapped = checkParameterList(mu, len(idx)) for j, d in enumerate(idx): muMapped.data[:, j] = expressionEvaluator( self.parameterMap[direct][d], muMapped(j)).flatten() return muMapped def buildEnergyNormForm(self): """ Build sparse matrix (in CSR format) representative of scalar product. """ self.energyNormMatrix = 1. def buildEnergyNormDualForm(self): """ Build sparse matrix (in CSR format) representative of dual scalar product without duality. """ self.energyNormDualMatrix = 1. def innerProduct(self, u:Np2D, v:Np2D, onlyDiag : bool = False, dual : bool = False, is_state : bool = True) -> Np2D: """Scalar product.""" if is_state or self.isCEye: if dual: if not hasattr(self, "energyNormDualMatrix"): self.buildEnergyNormDualForm() energyMat = self.energyNormDualMatrix else: if not hasattr(self, "energyNormMatrix"): self.buildEnergyNormForm() energyMat = self.energyNormMatrix else: energyMat = self.outputNormMatrix if isinstance(u, (parameterList, sampleList)): u = u.data if isinstance(v, (parameterList, sampleList)): v = v.data if onlyDiag: return np.sum(dot(energyMat, u) * v.conj(), axis = 0) return dot(dot(energyMat, u).T, v.conj()).T def norm(self, u:Np2D, dual : bool = False, is_state : bool = True) -> Np1D: return np.abs(self.innerProduct(u, u, onlyDiag = True, dual = dual, is_state = is_state)) ** .5 def baselineA(self): """Return 0 of shape consistent with operator of linear system.""" if (hasattr(self, "As") and isinstance(self.As, Iterable) and self.As[0] is not None): d = self.As[0].shape[0] else: d = self.spacedim return scsp.csr_matrix((np.zeros(0), np.zeros(0), np.zeros(d + 1)), shape = (d, d), dtype = np.complex) def baselineb(self): """Return 0 of shape consistent with RHS of linear system.""" return np.zeros(self.spacedim, dtype = np.complex) @nonaffine_construct @abstractmethod def A(self, mu : paramVal = [], der : List[int] = 0) -> Np2D: """ Assemble terms of operator of linear system and return it (or its derivative) at a given parameter. """ return @nonaffine_construct @abstractmethod def b(self, mu : paramVal = [], der : List[int] = 0) -> Np1D: """ Assemble terms of RHS of linear system and return it (or its derivative) at a given parameter. """ return @mu_independent def C(self, mu:paramVal): """ Value of C. Should be overridden (with something like return self._C(mu) ) if a mu-dependent C is needed. """ if self._C is None: self._C = 1. return self._C def setCQuadratic(self, quad : bool = True): if quad: self._is_C_quadratic = True elif hasattr(self, "_is_C_quadratic"): del self._is_C_quadratic @property def isCEye(self): """ Whether the action of C can be seen as a scalar multiplication. Should be overridden (with return True ) if a mu-dependent scalar C is used. """ return isinstance(self._C, Number) def applyC(self, u:sampList, mu:paramVal): """ Apply LHS of linear system. If a mu-dependent scalar C is used in approximant computation with POD = 1, it must NOT depend on the pivot variable(s). """ if hasattr(self, "_is_C_quadratic") and self._is_C_quadratic: skip_cc = (hasattr(self, "_is_C_selfadjoint") and self._is_C_selfadjoint) u, C = np.atleast_2d(u), self.C(mu) N = u.shape[1] if C.ndim != 2: - raise RROMPyException(("C array for quadratic output must ", + raise RROMPyException(("C array for quadratic output must " "have 2 dimensions.")) for j in range(N): if skip_cc: cj = dot(dot(C, u[:, j]), u[:, j :].conj()) else: cj = dot(dot(C, u[:, j]), u.conj()) if j == 0: res = np.empty(N ** 2, dtype = cj.dtype) if skip_cc: res[j * (j + 1)] = cj[0] for l in range(j + 1, N): res[l * (l + 2) - j] = cj[l - j] res[l ** 2 + j] = cj[l - j].conj() else: for l in range(N): res[max(l, j) * (max(l, j) + 1) + l - j] = cj[l] return np.expand_dims(res, 0) return dot(self.C(mu), u) def setSolver(self, solverType:str, solverArgs : DictAny = {}): """Choose solver type and parameters.""" self._solver, self._solverArgs = setupSolver(solverType, solverArgs) def solve(self, mu : paramList = [], RHS : sampList = None, return_state : bool = False) -> sampList: """ Find solution of linear system. Args: mu: parameter value. RHS: RHS of linear system. If None, defaults to that of parametric system. Defaults to None. return_state: whether to return state before multiplication by c. Defaults to False. """ from rrompy.sampling import sampleList, emptySampleList if mu == []: mu = self.mu0 mu = self.checkParameterList(mu) if len(mu) == 0: return emptySampleList() mu = self.checkParameterList(mu) mu_loc, idx, sizes = listScatter(mu, return_sizes = True) mu_loc = self.checkParameterList(mu_loc) req, emptyCores = [], np.where(np.logical_not(sizes))[0] if len(mu_loc) == 0: uL, uT = recv(source = 0, tag = poolRank()) sol = np.empty((uL, 0), dtype = uT) else: applyCglob = (hasattr(self, "_is_C_quadratic") and self._is_C_quadratic) if RHS is None: # build RHSs RHS = sampleList([self.b(m) for m in mu_loc]) else: RHS = sampleList(RHS) if len(RHS) > 1: RHS = sampleList([RHS[i] for i in idx]) mult = 0 if len(RHS) == 1 else 1 RROMPyAssert(mult * (len(mu_loc) - 1) + 1, len(RHS), "Sample size") for j, mj in enumerate(mu_loc): u = tsolve(self.A(mj), RHS[mult * j], self._solver, self._solverArgs) if not (return_state or applyCglob): u = self.applyC(u, mj) if j == 0: sol = np.empty((len(u), len(mu_loc)), dtype = u.dtype) if masterCore(): for dest in emptyCores: req += [isend((len(u), u.dtype), dest = dest, tag = dest)] sol[:, j] = u for r in req: r.wait() sol = matrixGatherv(sol, sizes) if not return_state and applyCglob: sol = self.applyC(sol, mu) return sampleList(sol) def residual(self, mu : paramList = [], u : sampList = None, post_c : bool = True) -> sampList: """ Find residual of linear system for given approximate solution. Args: mu: parameter value. u: numpy complex array with function dofs. If None, set to 0. post_c: whether to post-process using c. Defaults to True. """ from rrompy.sampling import sampleList, emptySampleList if mu == []: mu = self.mu0 mu = self.checkParameterList(mu) if len(mu) == 0: return emptySampleList() mu_loc, idx, sizes = listScatter(mu, return_sizes = True) mu_loc = self.checkParameterList(mu_loc) req, emptyCores = [], np.where(np.logical_not(sizes))[0] if len(mu_loc) == 0: uL, uT = recv(source = 0, tag = poolRank()) res = np.empty((uL, 0), dtype = uT) else: applyCglob = (hasattr(self, "_is_C_quadratic") and self._is_C_quadratic) v = sampleList(np.zeros((self.spacedim, len(mu_loc)))) if u is not None: u = sampleList(u) v = v + sampleList([u[i] for i in idx]) for j, (mj, vj) in enumerate(zip(mu_loc, v)): r = self.b(mj) - dot(self.A(mj), vj) if post_c and not applyCglob: r = self.applyC(r, mj) if j == 0: res = np.empty((len(r), len(mu_loc)), dtype = r.dtype) if masterCore(): for dest in emptyCores: req += [isend((len(r), r.dtype), dest = dest, tag = dest)] res[:, j] = r for r in req: r.wait() res = matrixGatherv(res, sizes) if post_c and applyCglob: res = self.applyC(res, mu) return sampleList(res) cutOffPolesRMax,cutOffPolesRMin = np.inf, - np.inf cutOffPolesRMaxRel, cutOffPolesRMinRel = np.inf, - np.inf cutOffPolesIMax, cutOffPolesIMin = np.inf, - np.inf cutOffPolesIMaxRel, cutOffPolesIMinRel = np.inf, - np.inf cutOffResNormMin = -1 def flagBadPolesResidues(self, poles:Np1D, residues : Np1D = None, relative : bool = False) -> Np1D: """ Flag (numerical) poles/residues which are impossible. Args: poles: poles to be judged. residues: residues to be judged. relative: whether relative values should be used for poles. """ poles = np.array(poles).flatten() flag = np.zeros(len(poles), dtype = bool) if residues is None: self._ignoreResidues = self.cutOffResNormMin <= 0. if relative: RMax, RMin = self.cutOffPolesRMaxRel, self.cutOffPolesRMinRel IMax, IMin = self.cutOffPolesIMaxRel, self.cutOffPolesIMinRel else: RMax, RMin = self.cutOffPolesRMax, self.cutOffPolesRMin IMax, IMin = self.cutOffPolesIMax, self.cutOffPolesIMin if not np.isinf(RMax): flag = np.logical_or(flag, np.real(poles) > RMax) if not np.isinf(RMin): flag = np.logical_or(flag, np.real(poles) < RMin) if not np.isinf(IMax): flag = np.logical_or(flag, np.imag(poles) > IMax) if not np.isinf(IMin): flag = np.logical_or(flag, np.imag(poles) < IMin) else: residues = np.array(residues).reshape(len(poles), -1) if self.cutOffResNormMin > 0.: if residues.shape[1] == self.spacedim: resEff = self.norm(residues.T) else: resEff = np.linalg.norm(residues, axis = 1) resEff /= np.max(resEff) flag = np.logical_or(flag, resEff < self.cutOffResNormMin) return flag diff --git a/rrompy/reduction_methods/base/trained_model/trained_model.py b/rrompy/reduction_methods/base/trained_model/trained_model.py index 5804af6..6fcbf78 100644 --- a/rrompy/reduction_methods/base/trained_model/trained_model.py +++ b/rrompy/reduction_methods/base/trained_model/trained_model.py @@ -1,132 +1,134 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from abc import abstractmethod import numpy as np from rrompy.utilities.base.types import Np1D, List, paramList, sampList from rrompy.utilities.expression import expressionEvaluator from rrompy.parameter import checkParameterList from rrompy.sampling import sampleList __all__ = ['TrainedModel'] class TrainedModel: """ ABSTRACT ROM approximant evaluation. Attributes: Data: dictionary with all that can be pickled. """ def name(self) -> str: return self.__class__.__name__ def __str__(self) -> str: return self.name() def __repr__(self) -> str: return self.__str__() + " at " + hex(id(self)) def reset(self): self.lastSolvedApproxReduced = None self.lastSolvedApprox = None def compress(self, collapse : bool = False, tol : float = 0.): if collapse: self.data.projMat = 1. self.data._collapsed = True if tol > 0.: self.data._compressTol = tol @property def npar(self): """Number of parameters.""" return self.data.mu0.shape[1] def checkParameterList(self, mu:paramList, check_if_single : bool = False) -> paramList: return checkParameterList(mu, self.data.npar, check_if_single) def mapParameterList(self, mu:paramList, direct : str = "F", idx : List[int] = None) -> paramList: if idx is None: idx = np.arange(self.npar) muMapped = checkParameterList(mu, len(idx)) for j, d in enumerate(idx): muMapped.data[:, j] = expressionEvaluator( self.data.parameterMap[direct][d], muMapped(j)).flatten() return muMapped @abstractmethod def getApproxReduced(self, mu : paramList = []) -> sampList: """ Evaluate reduced representation of approximant at arbitrary parameter. (ABSTRACT) Args: mu: Target parameter. """ pass + def _setupQuadMapping(self, N:int): + if (not hasattr(self, "quad_mapping") or self.quad_mapping.shape[1] != N ** 2): + idx = np.arange(N ** 2) + base = np.floor(idx ** .5) + above = base * (base + 1) - idx + li = base - above * (above > 0) + ri = base + above * (above < 0) + self.quad_mapping = np.vstack([li, ri]).astype(int) + def getApprox(self, mu : paramList = []) -> sampList: """ Evaluate approximant at arbitrary parameter. Args: mu: Target parameter. """ mu = self.checkParameterList(mu) if (not hasattr(self, "lastSolvedApprox") or self.lastSolvedApprox != mu): uApproxR = self.getApproxReduced(mu) if self.data._collapsed: self.uApprox = uApproxR else: for i, uApR in enumerate(uApproxR): if not (hasattr(self.data, "_is_C_quadratic") and self.data._is_C_quadratic): uApREff = uApR else: - if (not hasattr(self, "quad_mapping") - or self.quad_mapping.shape[1] != len(uApR) ** 2): - idx = np.arange(len(uApR) ** 2) - base = np.floor(idx ** .5) - above = base * (base + 1) - idx - li = base - above * (above > 0) - ri = base + above * (above < 0) - self.quad_mapping = np.vstack([li, ri]).astype(int) + self._setupQuadMapping(len(uApR)) uApREff = (uApR[self.quad_mapping[0]] * uApR[self.quad_mapping[1]].conj()) uAp = self.data.projMat[:, : uApREff.shape[0]].dot(uApREff) if i == 0: uApprox = np.empty((len(uAp), len(uApproxR)), dtype = uAp.dtype) uApprox[:, i] = uAp self.uApprox = sampleList(uApprox) self.lastSolvedApprox = mu return self.uApprox @abstractmethod def getPoles(self) -> Np1D: """ Obtain approximant poles. Returns: Numpy complex vector of poles. """ pass diff --git a/rrompy/reduction_methods/pivoted/trained_model/trained_model_pivoted_rational.py b/rrompy/reduction_methods/pivoted/trained_model/trained_model_pivoted_rational.py index 4f0f036..aca80de 100644 --- a/rrompy/reduction_methods/pivoted/trained_model/trained_model_pivoted_rational.py +++ b/rrompy/reduction_methods/pivoted/trained_model/trained_model_pivoted_rational.py @@ -1,293 +1,298 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import warnings import numpy as np from scipy.sparse import csr_matrix, hstack, SparseEfficiencyWarning from copy import deepcopy as copy from .trained_model_pivoted_rational_nomatch import ( TrainedModelPivotedRationalNoMatch) from rrompy.utilities.base.types import (Np2D, ListAny, paramVal, paramList, HFEng) from rrompy.utilities.base import verbosityManager as vbMng from rrompy.utilities.numerical.point_matching import rationalFunctionMatching from rrompy.utilities.numerical.degree import reduceDegreeN from rrompy.utilities.poly_fitting.polynomial import (polybases as ppb, PolynomialInterpolator as PI) from rrompy.utilities.poly_fitting.radial_basis import (polybases as rbpb, RadialBasisInterpolator as RBI) from rrompy.utilities.poly_fitting.heaviside import (heavisideUniformShape, HeavisideInterpolator as HI) from rrompy.utilities.poly_fitting.nearest_neighbor import ( NearestNeighborInterpolator as NNI) from rrompy.utilities.poly_fitting.piecewise_linear import (sparsekinds, PiecewiseLinearInterpolator as PLI) from rrompy.utilities.exception_manager import RROMPyException __all__ = ['TrainedModelPivotedRational'] class TrainedModelPivotedRational(TrainedModelPivotedRationalNoMatch): """ ROM approximant evaluation for pivoted approximants based on interpolation of rational approximants (with pole matching). Attributes: Data: dictionary with all that can be pickled. """ def centerNormalizeMarginal(self, mu : paramList = [], mu0 : paramVal = None) -> paramList: """ Compute normalized parameter to be plugged into approximant. Args: mu: Parameter(s) 1. mu0: Parameter(s) 2. If None, set to self.data.mu0Marginal. Returns: Normalized parameter. """ mu = self.checkParameterListMarginal(mu) if mu0 is None: mu0 = self.checkParameterListMarginal( self.data.mu0(0, self.data.directionMarginal)) return (self.mapParameterList(mu, idx = self.data.directionMarginal) - self.mapParameterList(mu0, idx = self.data.directionMarginal) ) / [self.data.scaleFactor[x] for x in self.data.directionMarginal] def setupMarginalInterp(self, approx, interpPars:ListAny, extraPar = None): vbMng(self, "INIT", "Starting computation of marginal interpolator.", 12) musMCN = self.centerNormalizeMarginal(self.data.musMarginal) nM, pbM = len(musMCN), approx.polybasisMarginal if pbM in ppb + rbpb: if extraPar: approx._setMMarginalAuto() _MMarginalEff = approx.paramsMarginal["MMarginal"] if pbM in ppb: p = PI() elif pbM in rbpb: p = RBI() else: # if pbM in sparsekinds + ["NEARESTNEIGHBOR"]: if pbM == "NEARESTNEIGHBOR": p = NNI() else: # if pbM in sparsekinds: pllims = [[-1.] * self.data.nparMarginal, [1.] * self.data.nparMarginal] p = PLI() for ipts, pts in enumerate(self.data.suppEffPts): if len(pts) == 0: raise RROMPyException("Empty list of support points.") musMCNEff, valsEff = musMCN[pts], np.eye(len(pts)) if pbM in ppb + rbpb: if extraPar: if ipts > 0: verb = approx.verbosity approx.verbosity = 0 _musM = approx.musMarginal approx.musMarginal = musMCNEff approx._setMMarginalAuto() approx.musMarginal = _musM approx.verbosity = verb else: approx.paramsMarginal["MMarginal"] = reduceDegreeN( _MMarginalEff, len(musMCNEff), self.data.nparMarginal, approx.paramsMarginal["polydegreetypeMarginal"]) MMEff = approx.paramsMarginal["MMarginal"] while MMEff >= 0: wellCond, msg = p.setupByInterpolation(musMCNEff, valsEff, MMEff, *interpPars) vbMng(self, "MAIN", msg, 30) if wellCond: break vbMng(self, "MAIN", ("Polyfit is poorly conditioned. Reducing " "MMarginal by 1."), 35) MMEff -= 1 if MMEff < 0: raise RROMPyException(("Instability in computation of " "interpolant. Aborting.")) if (pbM in rbpb and len(interpPars) > 4 and "optimizeScalingBounds" in interpPars[4].keys()): interpPars[4]["optimizeScalingBounds"] = [-1., -1.] elif pbM == "NEARESTNEIGHBOR": if ipts > 0: interpPars[0] = 1 p.setupByInterpolation(musMCNEff, valsEff, *interpPars) elif ipts == 0: # and pbM in sparsekinds: p.setupByInterpolation(musMCNEff, valsEff, pllims, extraPar[pts], *interpPars) if ipts == 0: self.data.marginalInterp = copy(p) self.data.coeffsEff, self.data.polesEff = [], [] for hi, sup in zip(self.data.HIs, self.data.Psupp): cEff = hi.coeffs if (self.data._collapsed or self.data.projMat.shape[1] == cEff.shape[1]): cEff = copy(cEff) else: supC = self.data.projMat.shape[1] - sup - cEff.shape[1] cEff = hstack((csr_matrix((len(cEff), sup)), csr_matrix(cEff), csr_matrix((len(cEff), supC))), "csr") self.data.coeffsEff += [cEff] self.data.polesEff += [copy(hi.poles)] else: ptsBad = [i for i in range(nM) if i not in pts] idxBad = np.where(self.data.suppEffIdx == ipts)[0] warnings.simplefilter('ignore', SparseEfficiencyWarning) if pbM in sparsekinds: for ij, j in enumerate(ptsBad): nearest = pts[np.argmin(np.sum(np.abs(musMCNEff.data - np.tile(musMCN[j], [len(pts), 1]) ), axis = 1).flatten())] self.data.coeffsEff[j][idxBad] = copy( self.data.coeffsEff[nearest][idxBad]) self.data.polesEff[j][idxBad] = copy( self.data.polesEff[nearest][idxBad]) else: if (self.data._collapsed or self.data.projMat.shape[1] == cEff.shape[1]): cfBase = np.zeros((len(idxBad), cEff.shape[1]), dtype = cEff.dtype) else: cfBase = csr_matrix((len(idxBad), self.data.projMat.shape[1]), dtype = cEff.dtype) valMuMBad = p(musMCN[ptsBad]) for ijb, jb in enumerate(ptsBad): self.data.coeffsEff[jb][idxBad] = copy(cfBase) self.data.polesEff[jb][idxBad] = 0. for ij, j in enumerate(pts): val = valMuMBad[ij][ijb] if not np.isclose(val, 0.): self.data.coeffsEff[jb][idxBad] += (val * self.data.coeffsEff[j][idxBad]) self.data.polesEff[jb][idxBad] += (val * self.data.polesEff[j][idxBad]) warnings.filters.pop(0) if pbM in ppb + rbpb: approx.paramsMarginal["MMarginal"] = _MMarginalEff vbMng(self, "DEL", "Done computing marginal interpolator.", 12) def initializeFromLists(self, poles:ListAny, coeffs:ListAny, supps:ListAny, basis:str, matchingWeight:float, HFEngine:HFEng, is_state:bool): """Initialize Heaviside representation.""" Ns = [len(pls) for pls in poles] poles, coeffs = heavisideUniformShape(poles, coeffs) root = Ns.index(len(poles[0])) + if hasattr(self.data, "_is_C_quadratic") and self.data._is_C_quadratic: + self._setupQuadMapping(coeffs[0].shape[-1]) + projMapping = self.quad_mapping + else: + projMapping = None poles, coeffs = rationalFunctionMatching(poles, coeffs, self.data.musMarginal.data, matchingWeight, supps, self.data.projMat, HFEngine, - is_state, root) + is_state, root, projMapping) super().initializeFromLists(poles, coeffs, supps, basis) self.data.suppEffPts = [np.arange(len(self.data.HIs))] self.data.suppEffIdx = np.zeros(len(poles[0]), dtype = int) def checkSharedRatio(self, shared:float) -> str: N = len(self.data.HIs[0].poles) M = len(self.data.HIs) goodLocPoles = np.array([np.logical_not(np.isinf(hi.poles) ) for hi in self.data.HIs]) self.data.suppEffPts = [np.arange(len(self.data.HIs))] self.data.suppEffIdx = np.zeros(N, dtype = int) if np.all(np.all(goodLocPoles)): return " No poles erased." goodGlobPoles = np.sum(goodLocPoles, axis = 0) goodEnoughPoles = goodGlobPoles >= max(1., 1. * shared * M) keepPole = np.where(goodEnoughPoles)[0] halfPole = np.where(np.logical_and(goodEnoughPoles, goodGlobPoles < M))[0] removePole = np.where(np.logical_not(goodEnoughPoles))[0] if len(removePole) > 0: keepCoeff = np.append(keepPole, np.arange(N, len(self.data.HIs[0].coeffs))) for hi in self.data.HIs: for j in removePole: if not np.isinf(hi.poles[j]): hi.coeffs[N, :] -= hi.coeffs[j, :] / hi.poles[j] hi.poles = hi.poles[keepPole] hi.coeffs = hi.coeffs[keepCoeff, :] for idxR in halfPole: pts = np.where(goodLocPoles[:, idxR])[0] idxEff = len(self.data.suppEffPts) for idEff, prevPts in enumerate(self.data.suppEffPts): if len(prevPts) == len(pts): if np.allclose(prevPts, pts): idxEff = idEff break if idxEff == len(self.data.suppEffPts): self.data.suppEffPts += [pts] self.data.suppEffIdx[idxR] = idxEff self.data.suppEffIdx = self.data.suppEffIdx[keepPole] return (" Hard-erased {} pole".format(len(removePole)) + "s" * (len(removePole) != 1) + " and soft-erased {} pole".format(len(halfPole)) + "s" * (len(halfPole) != 1) + ".") def interpolateMarginalInterpolator(self, mu : paramList = []) -> ListAny: """Obtain interpolated approximant interpolator.""" mu = self.checkParameterListMarginal(mu) vbMng(self, "INIT", "Interpolating marginal models at mu = {}.".format(mu), 95) his = [] muC = self.centerNormalizeMarginal(mu) mIvals = self.data.marginalInterp(muC) verb, self.verbosity = self.verbosity, 0 poless = self.interpolateMarginalPoles(mu, mIvals) coeffss = self.interpolateMarginalCoeffs(mu, mIvals) self.verbosity = verb for j in range(len(mu)): his += [HI()] his[-1].poles = poless[j] his[-1].coeffs = coeffss[j] his[-1].npar = 1 his[-1].polybasis = self.data.HIs[0].polybasis vbMng(self, "DEL", "Done interpolating marginal models.", 95) return his def interpolateMarginalPoles(self, mu : paramList = [], mIvals : Np2D = None) -> ListAny: """Obtain interpolated approximant poles.""" mu = self.checkParameterListMarginal(mu) vbMng(self, "INIT", "Interpolating marginal poles at mu = {}.".format(mu), 95) intMPoles = np.zeros((len(mu),) + self.data.polesEff[0].shape, dtype = self.data.polesEff[0].dtype) if mIvals is None: muC = self.centerNormalizeMarginal(mu) mIvals = self.data.marginalInterp(muC) for pEff, mI in zip(self.data.polesEff, mIvals): intMPoles += np.expand_dims(mI, - 1) * pEff vbMng(self, "DEL", "Done interpolating marginal poles.", 95) return intMPoles def interpolateMarginalCoeffs(self, mu : paramList = [], mIvals : Np2D = None) -> ListAny: """Obtain interpolated approximant coefficients.""" mu = self.checkParameterListMarginal(mu) vbMng(self, "INIT", "Interpolating marginal coefficients at mu = {}.".format(mu), 95) intMCoeffs = np.zeros((len(mu),) + self.data.coeffsEff[0].shape, dtype = self.data.coeffsEff[0].dtype) if mIvals is None: muC = self.centerNormalizeMarginal(mu) mIvals = self.data.marginalInterp(muC) for cEff, mI in zip(self.data.coeffsEff, mIvals): for j, m in enumerate(mI): intMCoeffs[j] += m * cEff vbMng(self, "DEL", "Done interpolating marginal coefficients.", 95) return intMCoeffs diff --git a/rrompy/reduction_methods/pivoted/trained_model/trained_model_pivoted_rational_nomatch.py b/rrompy/reduction_methods/pivoted/trained_model/trained_model_pivoted_rational_nomatch.py index 62b45be..7ec063b 100644 --- a/rrompy/reduction_methods/pivoted/trained_model/trained_model_pivoted_rational_nomatch.py +++ b/rrompy/reduction_methods/pivoted/trained_model/trained_model_pivoted_rational_nomatch.py @@ -1,328 +1,331 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import numpy as np from scipy.special import factorial as fact from collections.abc import Iterable from copy import deepcopy as copy from itertools import combinations from rrompy.reduction_methods.standard.trained_model.trained_model_rational \ import TrainedModelRational from rrompy.utilities.base.types import (Np1D, Np2D, List, ListAny, paramVal, paramList, sampList) from rrompy.utilities.base import verbosityManager as vbMng, freepar as fp from rrompy.utilities.numerical import dot from rrompy.utilities.numerical.compress_matrix import compressMatrix from rrompy.utilities.poly_fitting.heaviside import (rational2heaviside, HeavisideInterpolator as HI) from rrompy.utilities.poly_fitting.nearest_neighbor import ( NearestNeighborInterpolator as NNI) from rrompy.utilities.exception_manager import (RROMPyException, RROMPyAssert, RROMPyWarning) from rrompy.parameter import checkParameterList from rrompy.sampling import sampleList, emptySampleList __all__ = ['TrainedModelPivotedRationalNoMatch'] class TrainedModelPivotedRationalNoMatch(TrainedModelRational): """ ROM approximant evaluation for pivoted approximants based on interpolation of rational approximants (without pole matching). Attributes: Data: dictionary with all that can be pickled. """ def checkParameterListPivot(self, mu:paramList, check_if_single : bool = False) -> paramList: return checkParameterList(mu, self.data.nparPivot, check_if_single) def checkParameterListMarginal(self, mu:paramList, check_if_single : bool = False) -> paramList: return checkParameterList(mu, self.data.nparMarginal, check_if_single) def compress(self, collapse : bool = False, tol : float = 0., *args, **kwargs): if not collapse and tol <= 0.: return + if hasattr(self.data, "_is_C_quadratic") and self.data._is_C_quadratic: + raise RROMPyException(("Cannot compress model with quadratic " + "output.")) RMat = self.data.projMat if not collapse: if hasattr(self.data, "_compressTol"): RROMPyWarning(("Recompressing already compressed model is " "ineffective. Aborting.")) return self.data.projMat, RMat, _ = compressMatrix(RMat, tol, *args, **kwargs) for obj, suppj in zip(self.data.HIs, self.data.Psupp): obj.postmultiplyTensorize(RMat.T[suppj : suppj + obj.shape[0]]) if hasattr(self, "_HIsExcl"): for obj, suppj in zip(self._HIsExcl, self.data.Psupp): obj.postmultiplyTensorize(RMat.T[suppj : suppj + obj.shape[0]]) if hasattr(self.data, "Ps"): for obj, suppj in zip(self.data.Ps, self.data.Psupp): obj.postmultiplyTensorize(RMat.T[suppj : suppj + obj.shape[0]]) if hasattr(self, "_PsExcl"): for obj, suppj in zip(self._PsExcl, self._PsuppExcl): obj.postmultiplyTensorize(RMat.T[suppj : suppj + obj.shape[0]]) if hasattr(self.data, "coeffsEff"): for j in range(len(self.data.coeffsEff)): self.data.coeffsEff[j] = dot(self.data.coeffsEff[j], RMat.T) if hasattr(self, "_HIsExcl") or hasattr(self, "_PsExcl"): self._PsuppExcl = [0] * len(self._PsuppExcl) self.data.Psupp = [0] * len(self.data.Psupp) super(TrainedModelRational, self).compress(collapse, tol) def centerNormalizePivot(self, mu : paramList = [], mu0 : paramVal = None) -> paramList: """ Compute normalized parameter to be plugged into approximant. Args: mu: Parameter(s) 1. mu0: Parameter(s) 2. If None, set to self.data.mu0Pivot. Returns: Normalized parameter. """ mu = self.checkParameterListPivot(mu) if mu0 is None: mu0 = self.checkParameterListPivot( self.data.mu0(0, self.data.directionPivot)) return (self.mapParameterList(mu, idx = self.data.directionPivot) - self.mapParameterList(mu0, idx = self.data.directionPivot) ) / [self.data.scaleFactor[x] for x in self.data.directionPivot] def setupMarginalInterp(self, interpPars:ListAny): self.data.marginalInterp = NNI() self.data.marginalInterp.setupByInterpolation(self.data.musMarginal, np.arange(len(self.data.musMarginal)), 1, *interpPars) def updateEffectiveSamples(self, exclude:List[int], *args, **kwargs): if hasattr(self, "_idxExcl"): for j, excl in enumerate(self._idxExcl): self.data.musMarginal.insert(self._musMExcl[j], excl) self.data.HIs.insert(excl, self._HIsExcl[j]) self.data.Ps.insert(excl, self._PsExcl[j]) self.data.Qs.insert(excl, self._QsExcl[j]) self.data.Psupp.insert(excl, self._PsuppExcl[j]) self._idxExcl, self._musMExcl = list(np.sort(exclude)), [] self._HIsExcl, self._PsExcl, self._QsExcl = [], [], [] self._PsuppExcl = [] for excl in self._idxExcl[::-1]: self._musMExcl = [self.data.musMarginal[excl]] + self._musMExcl self.data.musMarginal.pop(excl) self._HIsExcl = [self.data.HIs.pop(excl)] + self._HIsExcl self._PsExcl = [self.data.Ps.pop(excl)] + self._PsExcl self._QsExcl = [self.data.Qs.pop(excl)] + self._QsExcl self._PsuppExcl = [self.data.Psupp.pop(excl)] + self._PsuppExcl poles = [hi.poles for hi in self.data.HIs] coeffs = [hi.coeffs for hi in self.data.HIs] self.initializeFromLists(poles, coeffs, self.data.Psupp, self.data.HIs[0].polybasis, *args, **kwargs) def initializeFromLists(self, poles:ListAny, coeffs:ListAny, supps:ListAny, basis:str, *args, **kwargs): """Initialize Heaviside representation.""" self.data.HIs = [] for pls, cfs in zip(poles, coeffs): hsi = HI() hsi.poles = pls if len(cfs) == len(pls): cfs = np.pad(cfs, ((0, 1), (0, 0)), "constant") hsi.coeffs = cfs hsi.npar = 1 hsi.polybasis = basis self.data.HIs += [hsi] def initializeFromRational(self, *args, **kwargs): """Initialize Heaviside representation.""" RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters") poles, coeffs = [], [] for Q, P in zip(self.data.Qs, self.data.Ps): cfs, pls, basis = rational2heaviside(P, Q) poles += [pls] coeffs += [cfs] self.initializeFromLists(poles, coeffs, self.data.Psupp, basis, *args, **kwargs) def interpolateMarginalInterpolator(self, mu : paramList = []) -> ListAny: """Obtain interpolated approximant interpolator.""" mu = self.checkParameterListMarginal(mu) vbMng(self, "INIT", "Finding nearest neighbor to mu = {}.".format(mu), 95) his = [] intM = np.array(self.data.marginalInterp(mu), dtype = int) for j in range(len(mu)): i = intM[j] his += [HI()] his[-1].poles = copy(self.data.HIs[i].poles) his[-1].coeffs = copy(self.data.HIs[i].coeffs) his[-1].npar = 1 his[-1].polybasis = self.data.HIs[0].polybasis if not self.data._collapsed: his[-1].pad(self.data.Psupp[i], self.data.projMat.shape[1] - self.data.Psupp[i] - his[-1].shape[0]) vbMng(self, "DEL", "Done finding nearest neighbor.", 95) return his def interpolateMarginalPoles(self, mu : paramList = []) -> ListAny: """Obtain interpolated approximant poles.""" interps = self.interpolateMarginalInterpolator(mu) return [interp.poles for interp in interps] def interpolateMarginalCoeffs(self, mu : paramList = []) -> ListAny: """Obtain interpolated approximant poles.""" interps = self.interpolateMarginalInterpolator(mu) return [interp.coeffs for interp in interps] def getApproxReduced(self, mu : paramList = []) -> sampList: """ Evaluate reduced representation of approximant at arbitrary parameter. Args: mu: Target parameter. """ RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters") mu = self.checkParameterList(mu) if (not hasattr(self, "lastSolvedApproxReduced") or self.lastSolvedApproxReduced != mu): vbMng(self, "INIT", "Evaluating approximant at mu = {}.".format(mu), 12) muP = self.centerNormalizePivot(mu(self.data.directionPivot)) muM = mu(self.data.directionMarginal) his = self.interpolateMarginalInterpolator(muM) for i, (mP, hi) in enumerate(zip(muP, his)): uAppR = hi(mP)[:, 0] if i == 0: uApproxR = np.empty((len(uAppR), len(mu)), dtype = uAppR.dtype) uApproxR[:, i] = uAppR self.uApproxReduced = sampleList(uApproxR) vbMng(self, "DEL", "Done evaluating approximant.", 12) self.lastSolvedApproxReduced = mu return self.uApproxReduced def getPVal(self, mu : paramList = []) -> sampList: """ Evaluate rational numerator at arbitrary parameter. Args: mu: Target parameter. """ RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters") mu = self.checkParameterList(mu) p = emptySampleList() muP = self.centerNormalizePivot(mu(self.data.directionPivot)) muM = mu(self.data.directionMarginal) his = self.interpolateMarginalInterpolator(muM) for i, (mP, hi) in enumerate(zip(muP, his)): Pval = hi(mP) * np.prod(mP[0] - hi.poles) if i == 0: p.reset((len(Pval), len(mu)), dtype = Pval.dtype) p[i] = Pval return p def getQVal(self, mu:Np1D, der : List[int] = None, scl : Np1D = None) -> Np1D: """ Evaluate rational denominator at arbitrary parameter. Args: mu: Target parameter. der(optional): Derivatives to take before evaluation. """ RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters") mu = self.checkParameterList(mu) muP = self.centerNormalizePivot(mu(self.data.directionPivot)) muM = mu(self.data.directionMarginal) if der is None: derP, derM = 0, [0] else: derP = der[self.data.directionPivot[0]] derM = [der[x] for x in self.data.directionMarginal] if np.any(np.array(derM) != 0): raise RROMPyException(("Derivatives of Q with respect to marginal " "parameters not allowed.")) sclP = 1 if scl is None else scl[self.data.directionPivot[0]] derVal = np.zeros(len(mu), dtype = np.complex) pls = self.interpolateMarginalPoles(muM) for i, (mP, pl) in enumerate(zip(muP, pls)): N = len(pl) if derP == N: derVal[i] = 1. elif derP >= 0 and derP < N: plDist = muP[0] - pl for terms in combinations(np.arange(N), N - derP): derVal[i] += np.prod(plDist[list(terms)], axis = 1) return sclP ** derP * fact(derP) * derVal def getPoles(self, *args, **kwargs) -> Np1D: """ Obtain approximant poles. Returns: Numpy complex vector of poles. """ RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters") if len(args) + len(kwargs) > 1: raise RROMPyException(("Wrong number of parameters passed. " "Only 1 available.")) elif len(args) + len(kwargs) == 1: if len(args) == 1: mVals = args[0] else: mVals = kwargs["marginalVals"] if not isinstance(mVals, Iterable): mVals = [mVals] mVals = list(mVals) else: mVals = [fp] rDim = mVals.index(fp) if rDim < len(mVals) - 1 and fp in mVals[rDim + 1 :]: raise RROMPyException(("Exactly 1 'freepar' entry in " "marginalVals must be provided.")) if rDim != self.data.directionPivot[0]: raise RROMPyException(("'freepar' entry in marginalVals must " "coincide with pivot direction.")) mVals[rDim] = self.data.mu0(rDim)[0] mMarg = [mVals[j] for j in range(len(mVals)) if j != rDim] roots = (self.data.scaleFactor[rDim] * self.interpolateMarginalPoles(mMarg)[0]) return self.mapParameterList(self.mapParameterList(self.data.mu0(rDim), idx = [rDim])(0, 0) + roots, "B", [rDim])(0) def getResidues(self, *args, **kwargs) -> Np2D: """ Obtain approximant residues. Returns: Numpy matrix with residues as columns. """ pls = self.getPoles(*args, **kwargs) if len(args) == 1: mVals = args[0] elif len(args) == 0: mVals = [None] else: mVals = kwargs["marginalVals"] if not isinstance(mVals, Iterable): mVals = [mVals] mVals = list(mVals) rDim = mVals.index(fp) mMarg = [mVals[j] for j in range(len(mVals)) if j != rDim] res = self.interpolateMarginalCoeffs(mMarg)[0][: len(pls), :] if not self.data._collapsed: res = self.data.projMat.dot(res.T).T return pls, res diff --git a/rrompy/reduction_methods/standard/trained_model/trained_model_nearest_neighbor.py b/rrompy/reduction_methods/standard/trained_model/trained_model_nearest_neighbor.py index d99eeca..5d37ff5 100644 --- a/rrompy/reduction_methods/standard/trained_model/trained_model_nearest_neighbor.py +++ b/rrompy/reduction_methods/standard/trained_model/trained_model_nearest_neighbor.py @@ -1,94 +1,97 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import numpy as np from rrompy.reduction_methods.base.trained_model.trained_model import ( TrainedModel) from rrompy.utilities.numerical.compress_matrix import compressMatrix from rrompy.utilities.base.types import Np1D, paramList, sampList from rrompy.utilities.base import verbosityManager as vbMng -from rrompy.utilities.exception_manager import RROMPyWarning +from rrompy.utilities.exception_manager import RROMPyException, RROMPyWarning from rrompy.sampling import sampleList __all__ = ['TrainedModelNearestNeighbor'] class TrainedModelNearestNeighbor(TrainedModel): """ ROM approximant evaluation for nearest neighbor approximant. Attributes: Data: dictionary with all that can be pickled. """ def compress(self, collapse : bool = False, tol : float = 0., *args, **kwargs): if not collapse and tol <= 0.: return + if hasattr(self.data, "_is_C_quadratic") and self.data._is_C_quadratic: + raise RROMPyException(("Cannot compress model with quadratic " + "output.")) RMat = self.data.projMat if not collapse: if hasattr(self.data, "_compressTol"): RROMPyWarning(("Recompressing already compressed model is " "ineffective. Aborting.")) return self.data.projMat, RMat, _ = compressMatrix(RMat, tol, *args, **kwargs) for j, suppj in enumerate(self.data.supp): self.data.vals[j] = RMat[:, suppj : suppj + len(self.data.vals[j]) ].dot(self.data.vals[j]) self.data.supp[j] = [0] super().compress(collapse, tol) def getNearestNeighbor(self, mu : paramList = []) -> Np1D: """ Find nearest neighbor to arbitrary parameter. Args: mu: Target parameter. """ mu = self.checkParameterList(mu) vbMng(self, "INIT", "Finding nearest neighbor to mu = {}.".format(mu), 22) nn = [] intM = np.array(self.data.NN(mu), dtype = int) for iM in intM: val, supp = self.data.vals[iM], self.data.supp[iM] nn += [np.pad(val, (supp, self.data.projMat.shape[1] - supp - len(val)), 'constant')] nn = sampleList(nn) vbMng(self, "DEL", "Done finding nearest neighbor.", 22) return nn def getApproxReduced(self, mu : paramList = []) -> sampList: """ Evaluate reduced representation of approximant at arbitrary parameter. Args: mu: Target parameter. """ mu = self.checkParameterList(mu) if (not hasattr(self, "lastSolvedApproxReduced") or self.lastSolvedApproxReduced != mu): vbMng(self, "INIT", "Evaluating approximant at mu = {}.".format(mu), 12) self.uApproxReduced = self.getNearestNeighbor(mu) vbMng(self, "DEL", "Done evaluating approximant.", 12) self.lastSolvedApproxReduced = mu return self.uApproxReduced def getPoles(self, *args, **kwargs) -> Np1D: """Obtain approximant poles.""" return np.empty(0) diff --git a/rrompy/reduction_methods/standard/trained_model/trained_model_rational.py b/rrompy/reduction_methods/standard/trained_model/trained_model_rational.py index a68fd29..51e8918 100644 --- a/rrompy/reduction_methods/standard/trained_model/trained_model_rational.py +++ b/rrompy/reduction_methods/standard/trained_model/trained_model_rational.py @@ -1,188 +1,194 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import numpy as np from collections.abc import Iterable from rrompy.reduction_methods.base.trained_model.trained_model import ( TrainedModel) from rrompy.utilities.numerical import dot from rrompy.utilities.numerical.compress_matrix import compressMatrix from rrompy.utilities.base.types import (Np1D, Np2D, List, paramVal, paramList, sampList) from rrompy.utilities.base import verbosityManager as vbMng, freepar as fp from rrompy.utilities.exception_manager import RROMPyException, RROMPyWarning from rrompy.parameter import emptyParameterList from rrompy.sampling import sampleList __all__ = ['TrainedModelRational'] class TrainedModelRational(TrainedModel): """ ROM approximant evaluation for rational approximant. Attributes: Data: dictionary with all that can be pickled. """ def compress(self, collapse : bool = False, tol : float = 0., *args, **kwargs): if not collapse and tol <= 0.: return + if hasattr(self.data, "_is_C_quadratic") and self.data._is_C_quadratic: + raise RROMPyException(("Cannot compress model with quadratic " + "output.")) RMat = self.data.projMat if not collapse: if hasattr(self.data, "_compressTol"): RROMPyWarning(("Recompressing already compressed model is " "ineffective. Aborting.")) return self.data.projMat, RMat, _ = compressMatrix(RMat, tol, *args, **kwargs) self.data.P.postmultiplyTensorize(RMat.T) super().compress(collapse, tol) def centerNormalize(self, mu : paramList = [], mu0 : paramVal = None) -> paramList: """ Compute normalized parameter to be plugged into approximant. Args: mu: Parameter(s) 1. mu0: Parameter(s) 2. If None, set to self.data.mu0. Returns: Normalized parameter. """ mu = self.checkParameterList(mu) if mu0 is None: mu0 = self.data.mu0 return (self.mapParameterList(mu) - self.mapParameterList(mu0)) / self.data.scaleFactor def getPVal(self, mu : paramList = []) -> sampList: """ Evaluate rational numerator at arbitrary parameter. Args: mu: Target parameter. """ mu = self.checkParameterList(mu) vbMng(self, "INIT", "Evaluating numerator at mu = {}.".format(mu), 17) p = sampleList(self.data.P(self.centerNormalize(mu))) vbMng(self, "DEL", "Done evaluating numerator.", 17) return p def getQVal(self, mu:Np1D, der : List[int] = None, scl : Np1D = None) -> Np1D: """ Evaluate rational denominator at arbitrary parameter. Args: mu: Target parameter. der(optional): Derivatives to take before evaluation. """ mu = self.checkParameterList(mu) vbMng(self, "INIT", "Evaluating denominator at mu = {}.".format(mu), 17) q = self.data.Q(self.centerNormalize(mu), der, scl) vbMng(self, "DEL", "Done evaluating denominator.", 17) return q def getApproxReduced(self, mu : paramList = []) -> sampList: """ Evaluate reduced representation of approximant at arbitrary parameter. Args: mu: Target parameter. """ mu = self.checkParameterList(mu) if (not hasattr(self, "lastSolvedApproxReduced") or self.lastSolvedApproxReduced != mu): vbMng(self, "INIT", "Evaluating approximant at mu = {}.".format(mu), 12) QV = self.getQVal(mu) QVzero = np.where(QV == 0.)[0] if len(QVzero) > 0: QV[QVzero] = np.finfo(np.complex).eps / (1. + self.data.Q.deg[0]) self.uApproxReduced = self.getPVal(mu) / QV vbMng(self, "DEL", "Done evaluating approximant.", 12) self.lastSolvedApproxReduced = mu return self.uApproxReduced def getPoles(self, *args, **kwargs) -> Np1D: """ Obtain approximant poles. Returns: Numpy complex vector of poles. """ if len(args) + len(kwargs) > 1: raise RROMPyException(("Wrong number of parameters passed. " "Only 1 available.")) elif len(args) + len(kwargs) == 1: if len(args) == 1: mVals = args[0] else: mVals = kwargs["marginalVals"] if not isinstance(mVals, Iterable): mVals = [mVals] mVals = list(mVals) else: mVals = [fp] rDim = mVals.index(fp) if rDim < len(mVals) - 1 and fp in mVals[rDim + 1 :]: raise RROMPyException(("Exactly 1 'freepar' entry in " "marginalVals must be provided.")) mVals[rDim] = self.data.mu0(rDim) mVals = list(self.centerNormalize(mVals).data.flatten()) mVals[rDim] = fp roots = self.data.scaleFactor[rDim] * self.data.Q.roots(mVals) return self.mapParameterList(self.mapParameterList(self.data.mu0(rDim), idx = [rDim])(0, 0) + roots, "B", [rDim])(0) def getResidues(self, *args, **kwargs) -> Np2D: """ Obtain approximant residues. Returns: Numpy matrix with residues as columns. """ pls = self.getPoles(*args, **kwargs) if len(args) == 1: mVals = args[0] elif len(args) == 0: mVals = [None] else: mVals = kwargs["marginalVals"] if not isinstance(mVals, Iterable): mVals = [mVals] mVals = list(mVals) rDim = mVals.index(fp) poles = emptyParameterList() poles.reset((len(pls), self.data.npar), dtype = pls.dtype) for k, pl in enumerate(pls): mValsLoc = list(mVals) mValsLoc[rDim] = pl poles[k] = mValsLoc QV = self.getQVal(poles, list(1 * (np.arange(self.data.npar) == rDim))) QVzero = np.where(QV == 0.)[0] if len(QVzero) > 0: RROMPyWarning(("Adjusting residuals to avoid division by " "numerically zero denominator.")) QV[QVzero] = np.finfo(np.complex).eps / (1. + self.data.Q.deg[0]) - Res = self.getPVal(poles) + res = self.getPVal(poles) / QV + if hasattr(self.data, "_is_C_quadratic") and self.data._is_C_quadratic: + self._setupQuadMapping(res.shape[0]) + res.data = (res.data[self.quad_mapping[0]] + * res.data[self.quad_mapping[1]].conj()) if not self.data._collapsed: - Res = sampleList(dot(self.data.projMat[:, : Res.shape[0]], Res)) - res = Res / QV + res.data = dot(self.data.projMat[:, : res.shape[0]], res) return pls, res.T diff --git a/rrompy/reduction_methods/standard/trained_model/trained_model_reduced_basis.py b/rrompy/reduction_methods/standard/trained_model/trained_model_reduced_basis.py index d48c91c..aa068e7 100644 --- a/rrompy/reduction_methods/standard/trained_model/trained_model_reduced_basis.py +++ b/rrompy/reduction_methods/standard/trained_model/trained_model_reduced_basis.py @@ -1,153 +1,156 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import numpy as np from collections.abc import Iterable from rrompy.reduction_methods.base.trained_model.trained_model import ( TrainedModel) from rrompy.reduction_methods.standard.reduced_basis_utils import ( projectAffineDecomposition) from rrompy.utilities.base.types import (Np1D, ListAny, paramVal, paramList, sampList) from rrompy.utilities.base import verbosityManager as vbMng, freepar as fp from rrompy.utilities.numerical.compress_matrix import compressMatrix from rrompy.utilities.numerical.marginalize_poly_list import ( marginalizePolyList) from rrompy.utilities.numerical.nonlinear_eigenproblem import ( eigvalsNonlinearDense) from rrompy.utilities.expression import expressionEvaluator from rrompy.utilities.exception_manager import RROMPyException, RROMPyWarning from rrompy.parameter import checkParameter from rrompy.sampling import sampleList from rrompy.utilities.parallel import (poolRank, masterCore, listScatter, matrixGatherv, isend, recv) __all__ = ['TrainedModelReducedBasis'] class TrainedModelReducedBasis(TrainedModel): """ ROM approximant evaluation for RB approximant. Attributes: Data: dictionary with all that can be pickled. """ def reset(self): super().reset() if hasattr(self, "data") and hasattr(self.data, "lastSetupMu"): self.data.lastSetupMu = None def compress(self, collapse : bool = False, tol : float = 0., *args, **kwargs): if collapse: raise RROMPyException("Cannot collapse implicit surrogates.") if tol <= 0.: return + if hasattr(self.data, "_is_C_quadratic") and self.data._is_C_quadratic: + raise RROMPyException(("Cannot compress model with quadratic " + "output.")) if hasattr(self.data, "_compressTol"): RROMPyWarning(("Recompressing already compressed model is " "ineffective. Aborting.")) return self.data.projMat, RMat, _ = compressMatrix(self.data.projMat, tol, *args, **kwargs) self.data.ARBs, self.data.bRBs = projectAffineDecomposition( self.data.ARBs, self.data.bRBs, RMat) super().compress(collapse, tol) def assembleReducedModel(self, mu:paramVal): mu = checkParameter(mu, self.data.npar) if not (hasattr(self.data, "lastSetupMu") and self.data.lastSetupMu == mu): vbMng(self, "INIT", "Assembling reduced model for mu = {}."\ .format(mu), 17) muEff = self.mapParameterList(mu) self.data.ARBmu, self.data.bRBmu = 0., 0. for thA, ARB in zip(self.data.thAs, self.data.ARBs): self.data.ARBmu = (expressionEvaluator(thA[0], muEff) * ARB + self.data.ARBmu) for thb, bRB in zip(self.data.thbs, self.data.bRBs): self.data.bRBmu = (expressionEvaluator(thb[0], muEff) * bRB + self.data.bRBmu) vbMng(self, "DEL", "Done assembling reduced model.", 17) self.data.lastSetupMu = mu def getApproxReduced(self, mu : paramList = []) -> sampList: """ Evaluate reduced representation of approximant at arbitrary parameter. Args: mu: Target parameter. """ mu = self.checkParameterList(mu) if (not hasattr(self, "lastSolvedApproxReduced") or self.lastSolvedApproxReduced != mu): vbMng(self, "INIT", "Computing RB solution at mu = {}.".format(mu), 12) mu, _, sizes = listScatter(mu, return_sizes = True) mu = self.checkParameterList(mu) req, emptyCores = [], np.where(np.logical_not(sizes))[0] if len(mu) == 0: vbMng(self, "MAIN", "Idling.", 37) uL, uT = recv(source = 0, tag = poolRank()) uApproxR = np.empty((uL, 0), dtype = uT) else: for j, mj in enumerate(mu): self.assembleReducedModel(mj) uAppR = np.linalg.solve(self.data.ARBmu, self.data.bRBmu) if j == 0: uApproxR = np.empty((len(uAppR), len(mu)), dtype = uAppR.dtype) if masterCore(): for dest in emptyCores: req += [isend((len(uAppR), uAppR.dtype), dest = dest, tag = dest)] uApproxR[:, j] = uAppR for r in req: r.wait() uApproxR = matrixGatherv(uApproxR, sizes) self.uApproxReduced = sampleList(uApproxR) vbMng(self, "DEL", "Done computing RB solution.", 12) self.lastSolvedApproxReduced = mu return self.uApproxReduced def getPoles(self, marginalVals : ListAny = [fp], jSupp : int = 1, **kwargs) -> Np1D: """ Obtain approximant poles. Returns: Numpy complex vector of poles. """ if not self.data.affinePoly: RROMPyWarning(("Unable to compute approximate poles due " "to parametric dependence (detected non-" "polynomial). Change HFEngine.affinePoly to True " "if necessary.")) return if not isinstance(marginalVals, Iterable): marginalVals = [marginalVals] mVals = list(marginalVals) rDim = mVals.index(fp) if rDim < len(mVals) - 1 and fp in mVals[rDim + 1 :]: raise RROMPyException(("Exactly 1 'freepar' entry in " "marginalVals must be provided.")) ARBs = self.data.ARBs if self.data.npar > 1: mVals[rDim] = self.data.mu0(rDim) mVals = checkParameter(mVals, return_data = True).flatten() mVals[rDim] = fp ARBs = marginalizePolyList(ARBs, mVals, "auto") ev = eigvalsNonlinearDense(ARBs, jSupp = jSupp, **kwargs) return self.mapParameterList(ev, "B", [rDim])(0) diff --git a/rrompy/utilities/numerical/point_matching.py b/rrompy/utilities/numerical/point_matching.py index 74082c0..db87c38 100644 --- a/rrompy/utilities/numerical/point_matching.py +++ b/rrompy/utilities/numerical/point_matching.py @@ -1,87 +1,94 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import numpy as np from scipy.optimize import linear_sum_assignment as LSA from .point_distances import distanceMatrix, chordalMetricAngleMatrix from rrompy.utilities.base.types import Tuple, List, ListAny, Np1D, Np2D, HFEng from rrompy.utilities.exception_manager import RROMPyAssert __all__ = ['pointMatching', 'rationalFunctionMatching'] def pointMatching(distMatrix:Np2D) -> Tuple[Np1D, Np1D]: return LSA(distMatrix) def rationalFunctionMatching(poles:List[Np1D], coeffs:List[Np2D], featPts:Np2D, matchingWeight:float, supps:ListAny, projMat:Np2D, HFEngine : HFEng = None, - is_state : bool = True, root : int = None) \ + is_state : bool = True, root : int = None, + projMapping : Np2D = None) \ -> Tuple[List[Np1D], List[Np2D]]: """ Match poles and residues of a set of rational functions. Args: poles: List of (lists of) poles. coeffs: List of (lists of) residues. featPts: Marginal parameters corresponding to rational models. matchingWeight: Matching weight in distance computation. supps: Support indices for projection matrix. projMat: Projection matrix for residues. HFEngine(optional): Engine for distance evaluation. Defaults to None, i.e. Euclidean metric. is_state(optional): Whether residues are of system state. Defaults to True. root(optional): Root of search tree. Defaults to None, i.e. automatically chosen. + projMapping(optional): Mapping for projection based on projMap. Should + be assigned for nonlinear outputs. Defaults to None Returns: Matched list of (lists of) poles and list of (lists of) residues. """ M, N = len(featPts), len(poles[0]) RROMPyAssert(len(poles), M, "Number of rational functions to be matched") RROMPyAssert(len(coeffs), M, "Number of rational functions to be matched") if M <= 1: return poles, coeffs featDist = distanceMatrix(featPts).flatten() free = list(range(M)) if root is None: #start from sample points closest to each other root = np.argpartition(featDist, M)[M] % M fixed = [free.pop(root)] featDist = featDist.reshape(M, M) for j in range(M - 1, 0, -1): #find closest point idx = np.argmin(featDist[np.ix_(fixed, free)].flatten()) Ifix = fixed[idx // j] fixed += [free.pop(idx % j)] Ifree = fixed[-1] plsfix, plsfree = poles[Ifix], poles[Ifree] resfix, resfree = None, None if matchingWeight != 0: resfix, resfree = coeffs[Ifix][: N].T, coeffs[Ifree][: N].T + if projMapping is not None: + resfix = resfix[projMapping[0]] * resfix[projMapping[1]].conj() + resfree = (resfree[projMapping[0]] + * resfree[projMapping[1]].conj()) if isinstance(projMat, (np.ndarray,)): suppfix, suppfree = supps[Ifix], supps[Ifree] resfix = projMat[:, suppfix : suppfix + len(resfix)].dot( resfix) resfree = projMat[:, suppfree : suppfree + len(resfree)].dot( resfree) #build assignment distance matrix distj = chordalMetricAngleMatrix(plsfix, plsfree, matchingWeight, resfix, resfree, HFEngine, is_state) reordering = pointMatching(distj)[1] poles[Ifree] = poles[Ifree][reordering] coeffs[Ifree][: N] = coeffs[Ifree][reordering] return poles, coeffs