diff --git a/rrompy/parameter/parameter_sampling/__init__.py b/rrompy/parameter/parameter_sampling/__init__.py index 746a0ae..12e89ee 100644 --- a/rrompy/parameter/parameter_sampling/__init__.py +++ b/rrompy/parameter/parameter_sampling/__init__.py @@ -1,33 +1,35 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from .generic_sampler import GenericSampler from .fft_sampler import FFTSampler from .manual_sampler import ManualSampler from .quadrature_sampler import QuadratureSampler +from .quadrature_sampler_total import QuadratureSamplerTotal from .random_sampler import RandomSampler __all__ = [ 'GenericSampler', 'FFTSampler', 'ManualSampler', 'QuadratureSampler', + 'QuadratureSamplerTotal', 'RandomSampler' ] diff --git a/rrompy/parameter/parameter_sampling/fft_sampler.py b/rrompy/parameter/parameter_sampling/fft_sampler.py index a41f44b..8e2eb8d 100644 --- a/rrompy/parameter/parameter_sampling/fft_sampler.py +++ b/rrompy/parameter/parameter_sampling/fft_sampler.py @@ -1,51 +1,50 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import numpy as np from .generic_sampler import GenericSampler from rrompy.utilities.base.types import Np1D, List, paramList from rrompy.utilities.base import lowDiscrepancy, kroneckerer from rrompy.parameter import checkParameterList __all__ = ['FFTSampler'] class FFTSampler(GenericSampler): """Generator of FFT-type sample points on scaled roots of unity.""" def generatePoints(self, n:List[int], reorder : bool = True) -> paramList: """Array of sample points.""" if not hasattr(n, "__len__"): n = [n] super().generatePoints(n) nleft, nright = 1, np.prod(n) xmat = np.empty((nright, self.npar), dtype = np.complex) for d in range(self.npar): nright //= n[d] a, b = self.lims(0, d), self.lims(1, d) if self.scaling is not None: a, b = self.scaling[d](a), self.scaling[d](b) c, r = (a + b) / 2., np.abs(a - b) / 2. xd = c + r * np.exp(1.j * np.linspace(0, 2 * np.pi, n[d] + 1)[:-1]) - if reorder: - fejerOrdering = lowDiscrepancy(n[d]) - xd = xd[fejerOrdering] if self.scalingInv is not None: xd = self.scalingInv[d](xd) xmat[:, d] = kroneckerer(xd, nleft, nright) nleft *= n[d] + if reorder: + xmat = xmat[lowDiscrepancy(np.prod(n)), :] x, _ = checkParameterList(xmat, self.npar) return x diff --git a/rrompy/parameter/parameter_sampling/quadrature_sampler.py b/rrompy/parameter/parameter_sampling/quadrature_sampler.py index 0900816..26cfbd1 100644 --- a/rrompy/parameter/parameter_sampling/quadrature_sampler.py +++ b/rrompy/parameter/parameter_sampling/quadrature_sampler.py @@ -1,86 +1,87 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import numpy as np from .generic_sampler import GenericSampler -from rrompy.utilities.base.types import Np1D, List, paramList +from rrompy.utilities.base.types import List, paramList from rrompy.utilities.exception_manager import RROMPyException from rrompy.utilities.base import lowDiscrepancy, kroneckerer from rrompy.parameter import checkParameterList __all__ = ['QuadratureSampler'] class QuadratureSampler(GenericSampler): """Generator of quadrature sample points.""" allowedKinds = ["UNIFORM", "CHEBYSHEV", "GAUSSLEGENDRE"] def __init__(self, lims:paramList, kind : str = "UNIFORM", scaling : List[callable] = None, scalingInv : List[callable] = None): super().__init__(lims = lims, scaling = scaling, scalingInv = scalingInv) self.kind = kind def __str__(self) -> str: return "{}_{}".format(super().__str__(), self.kind) def __repr__(self) -> str: return self.__str__() + " at " + hex(id(self)) @property def kind(self): """Value of kind.""" return self._kind @kind.setter def kind(self, kind): if kind.upper() not in self.allowedKinds: raise RROMPyException("Generator kind not recognized.") self._kind = kind.upper() def generatePoints(self, n:List[int], reorder : bool = True) -> paramList: """Array of sample points.""" if not hasattr(n, "__len__"): n = [n] super().generatePoints(n) nleft, nright = 1, np.prod(n) xmat = np.empty((nright, self.npar), dtype = self.lims.dtype) for d in range(self.npar): nright //= n[d] a, b = self.lims(0, d), self.lims(1, d) if self.scaling is not None: a, b = self.scaling[d](a), self.scaling[d](b) c, r = (a + b) / 2., (a - b) / 2. dAbs = 2. * np.abs(r) if self.kind == "UNIFORM": xd = np.linspace(a, b, n[d]) elif self.kind == "CHEBYSHEV": nodes, _ = np.polynomial.chebyshev.chebgauss(n[d]) xd = c + r * nodes elif self.kind == "GAUSSLEGENDRE": nodes, _ = np.polynomial.legendre.leggauss(n[d]) xd = c + r * nodes[::-1] - if len(xd) > 1 and reorder: - fejerOrdering = [n[d] - 1] + lowDiscrepancy(n[d] - 1) - xd = xd[fejerOrdering] if self.scalingInv is not None: xd = self.scalingInv[d](xd) xmat[:, d] = kroneckerer(xd, nleft, nright) nleft *= n[d] + nright = np.prod(n) + if nright > 1 and reorder: + fejerOrdering = [nright - 1] + list(lowDiscrepancy(nright - 1)) + xmat = xmat[fejerOrdering, :] x, _ = checkParameterList(xmat, self.npar) return x diff --git a/rrompy/parameter/parameter_sampling/quadrature_sampler_total.py b/rrompy/parameter/parameter_sampling/quadrature_sampler_total.py new file mode 100644 index 0000000..624148d --- /dev/null +++ b/rrompy/parameter/parameter_sampling/quadrature_sampler_total.py @@ -0,0 +1,45 @@ +# Copyright (C) 2018 by the RROMPy authors +# +# This file is part of RROMPy. +# +# RROMPy is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# RROMPy is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with RROMPy. If not, see . +# + +from scipy.special import binom, factorial as fact +from .quadrature_sampler import QuadratureSampler +from rrompy.utilities.base.types import paramList +from rrompy.utilities.base import lowDiscrepancy +from rrompy.parameter import checkParameterList + +__all__ = ['QuadratureSamplerTotal'] + +class QuadratureSamplerTotal(QuadratureSampler): + """ + Generator of quadrature sample points for total degree polynomial + computations. + """ + + def generatePoints(self, n:int, reorder : bool = True) -> paramList: + """Array of sample points.""" + if hasattr(n, "__len__"): n = n[0] + d = self.npar + n1d = int((fact(d) * n) ** (1. / d)) + while binom(n1d + d - 1, d) > n: n1d -= 1 + x = super().generatePoints([n1d] * d, reorder = True)[list(range(n))] + if not reorder: + fejerOrderingInv = lowDiscrepancy(n, inverse = True) + xmat = x.data[fejerOrderingInv, :] + x, _ = checkParameterList(xmat, d) + return x + diff --git a/rrompy/parameter/parameter_sampling/random_sampler.py b/rrompy/parameter/parameter_sampling/random_sampler.py index cc68c21..a18583d 100644 --- a/rrompy/parameter/parameter_sampling/random_sampler.py +++ b/rrompy/parameter/parameter_sampling/random_sampler.py @@ -1,73 +1,76 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import numpy as np from .generic_sampler import GenericSampler +from rrompy.utilities.base.halton import haltonGenerate from rrompy.utilities.base.sobol import sobolGenerate from rrompy.utilities.base.types import Np1D, List, paramList from rrompy.utilities.exception_manager import RROMPyException from rrompy.parameter import checkParameterList __all__ = ['RandomSampler'] class RandomSampler(GenericSampler): """Generator of quadrature sample points.""" - allowedKinds = ["UNIFORM", "SOBOL"] + allowedKinds = ["UNIFORM", "HALTON", "SOBOL"] def __init__(self, lims:paramList, kind : str = "UNIFORM", scaling : List[callable] = None, scalingInv : List[callable] = None): super().__init__(lims = lims, scaling = scaling, scalingInv = scalingInv) self.kind = kind def __str__(self) -> str: return "{}_{}".format(super().__str__(), self.kind) def __repr__(self) -> str: return self.__str__() + " at " + hex(id(self)) @property def kind(self): """Value of kind.""" return self._kind @kind.setter def kind(self, kind): if kind.upper() not in self.allowedKinds: raise RROMPyException("Generator kind not recognized.") self._kind = kind.upper() - def generatePoints(self, n:int, seed : int = 420) -> paramList: + def generatePoints(self, n:int, seed : int = 42) -> paramList: """Array of quadrature points.""" if hasattr(n, "__len__"): n = n[0] if self.kind == "UNIFORM": np.random.seed(seed) xmat = np.random.uniform(size = (n, self.npar)) + elif self.kind == "HALTON": + xmat = haltonGenerate(self.npar, n, seed) else: xmat = sobolGenerate(self.npar, n, seed) for d in range(self.npar): a, b = self.lims(0, d), self.lims(1, d) if self.scaling is not None: a, b = self.scaling[d](a), self.scaling[d](b) xmat[:, d] = a + (b - a) * xmat[:, d] if self.scalingInv is not None: xmat[:, d] = self.scalingInv[d](xmat[:, d]) x, _ = checkParameterList(xmat, self.npar) return x - + diff --git a/rrompy/reduction_methods/base/generic_approximant.py b/rrompy/reduction_methods/base/generic_approximant.py index 2571ce9..7527e3c 100644 --- a/rrompy/reduction_methods/base/generic_approximant.py +++ b/rrompy/reduction_methods/base/generic_approximant.py @@ -1,884 +1,890 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from abc import abstractmethod import numpy as np from itertools import product as iterprod from copy import deepcopy as copy from os import remove as osrm from rrompy.sampling.linear_problem import (SamplingEngineLinear, SamplingEngineLinearPOD) from rrompy.utilities.base.types import (Np1D, DictAny, HFEng, List, ListAny, strLst, paramVal, paramList, sampList) from rrompy.utilities.base import purgeDict, verbosityDepth, getNewFilename from rrompy.utilities.exception_manager import (RROMPyException, RROMPyAssert, RROMPy_READY, RROMPy_FRAGILE) from rrompy.utilities.base import pickleDump, pickleLoad from rrompy.parameter import (emptyParameterList, checkParameter, checkParameterList) from rrompy.sampling import sampleList, emptySampleList __all__ = ['GenericApproximant'] def addNormFieldToClass(self, fieldName): def objFunc(self, mu:paramList, homogeneized : bool = False) -> float: uV = getattr(self.__class__, "get" + fieldName)(self, mu, homogeneized) val = self.HFEngine.norm(uV) return val setattr(self.__class__, "norm" + fieldName, objFunc) def addPlotFieldToClass(self, fieldName): def objFunc(self, mu:paramList, name : str = fieldName, save : str = None, what : strLst = 'all', saveFormat : str = "eps", saveDPI : int = 100, show : bool = True, homogeneized : bool = False, **figspecs): uV = getattr(self.__class__, "get" + fieldName)(self, mu, homogeneized) for j, u in enumerate(uV): self.HFEngine.plot(u, name = name + str(j), save = save, what = what, saveFormat = saveFormat, saveDPI = saveDPI, show = show, **figspecs) setattr(self.__class__, "plot" + fieldName, objFunc) def addOutParaviewFieldToClass(self, fieldName): def objFunc(self, mu:paramVal, name : str = fieldName, filename : str = "out", time : float = 0., what : strLst = 'all', forceNewFile : bool = True, folder : bool = False, filePW = None, homogeneized : bool = False): if not hasattr(self.HFEngine, "outParaview"): raise RROMPyException(("High fidelity engine cannot output to " "Paraview.")) uV = getattr(self.__class__, "get" + fieldName)(self, mu, homogeneized) for j, u in enumerate(uV): self.HFEngine.outParaview(u, name = name + str(j), filename = filename, time = time, what = what, forceNewFile = forceNewFile, folder = folder, filePW = filePW) setattr(self.__class__, "outParaview" + fieldName, objFunc) def addOutParaviewTimeDomainFieldToClass(self, fieldName): def objFunc(self, mu:paramVal, omega : float = None, timeFinal : float = None, periodResolution : int = 20, name : str = fieldName, filename : str = "out", forceNewFile : bool = True, folder : bool = False, homogeneized : bool = False): if not hasattr(self.HFEngine, "outParaviewTimeDomain"): raise RROMPyException(("High fidelity engine cannot output to " "Paraview.")) uV = getattr(self.__class__, "get" + fieldName)(self, mu, homogeneized) if omega is None: omega = np.real(mu) for j, u in enumerate(uV): self.HFEngine.outParaviewTimeDomain(u, omega = omega, timeFinal = timeFinal, periodResolution = periodResolution, name = name + str(j), filename = filename, forceNewFile = forceNewFile, folder = folder) setattr(self.__class__, "outParaviewTimeDomain" + fieldName, objFunc) class GenericApproximant: """ ABSTRACT ROM approximant computation for parametric problems. Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': whether to compute POD of snapshots; defaults to True; - 'S': total number of samples current approximant relies upon. Defaults to empty dict. homogeneized(optional): Whether to homogeneize Dirichlet BCs. Defaults to False. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. trainedModel: Trained model evaluator. mu0: Default parameter. homogeneized: Whether to homogeneize Dirichlet BCs. approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList{Soft,Critical}. parameterListSoft: Recognized keys of soft approximant parameters: - 'POD': whether to compute POD of snapshots. parameterListCritical: Recognized keys of critical approximant parameters: - 'S': total number of samples current approximant relies upon. verbosity: Verbosity level. POD: Whether to compute POD of snapshots. S: Number of solution snapshots over which current approximant is based upon. samplingEngine: Sampling engine. uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as sampleList. lastSolvedHF: Parameter(s) corresponding to last computed high fidelity solution(s) as parameterList. uAppReduced: Reduced approximate solution(s) with parameter(s) lastSolvedApp as sampleList. lastSolvedAppReduced: Parameter(s) corresponding to last computed reduced approximate solution(s) as parameterList. uApp: Approximate solution(s) with parameter(s) lastSolvedApp as sampleList. lastSolvedApp: Parameter(s) corresponding to last computed approximate solution(s) as parameterList. """ __all__ += [ftype + dtype for ftype, dtype in iterprod( ["norm", "plot", "outParaview", "outParaviewTimeDomain"], ["HF", "RHS", "Approx", "Res", "Err"])] def __init__(self, HFEngine:HFEng, mu0 : paramVal = None, approxParameters : DictAny = {}, homogeneized : bool = False, verbosity : int = 10, timestamp : bool = True): self._preInit() self._mode = RROMPy_READY self.verbosity = verbosity self.timestamp = timestamp if self.verbosity >= 10: verbosityDepth("INIT", ("Initializing approximant engine of " "type {}.").format(self.name()), timestamp = self.timestamp) self._HFEngine = HFEngine self.trainedModel = None self.lastSolvedHF = emptyParameterList() self.uHF = emptySampleList() self._addParametersToList(["POD"], [True], ["S"], [1]) if mu0 is None: if hasattr(self.HFEngine, "mu0"): self.mu0 = checkParameter(self.HFEngine.mu0) else: raise RROMPyException(("Center of approximation cannot be " "inferred from HF engine. Parameter " "required")) else: self.mu0 = checkParameter(mu0, self.HFEngine.npar) self.resetSamples() self.homogeneized = homogeneized self.approxParameters = approxParameters self._postInit() ### add norm{HF,RHS,Approx,Res,Err} methods """ Compute norm of * at arbitrary parameter. Args: mu: Target parameter. homogeneized(optional): Whether to remove Dirichlet BC. Defaults to False. Returns: Target norm of *. """ for objName in ["HF", "RHS", "Approx", "Res", "Err"]: addNormFieldToClass(self, objName) ### add plot{HF,RHS,Approx,Res,Err} methods """ Do some nice plots of * at arbitrary parameter. Args: mu: Target parameter. name(optional): Name to be shown as title of the plots. Defaults to 'u'. what(optional): Which plots to do. If list, can contain 'ABS', 'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard 'ALL'. Defaults to 'ALL'. save(optional): Where to save plot(s). Defaults to None, i.e. no saving. saveFormat(optional): Format for saved plot(s). Defaults to "eps". saveDPI(optional): DPI for saved plot(s). Defaults to 100. show(optional): Whether to show figure. Defaults to True. homogeneized(optional): Whether to remove Dirichlet BC. Defaults to False. figspecs(optional key args): Optional arguments for matplotlib figure creation. """ for objName in ["HF", "RHS", "Approx", "Res", "Err"]: addPlotFieldToClass(self, objName) ### add outParaview{HF,RHS,Approx,Res,Err} methods """ Output * to ParaView file. Args: mu: Target parameter. name(optional): Base name to be used for data output. filename(optional): Name of output file. time(optional): Timestamp. what(optional): Which plots to do. If list, can contain 'MESH', 'ABS', 'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard 'ALL'. Defaults to 'ALL'. forceNewFile(optional): Whether to create new output file. filePW(optional): Fenics File entity (for time series). homogeneized(optional): Whether to remove Dirichlet BC. Defaults to False. """ for objName in ["HF", "RHS", "Approx", "Res", "Err"]: addOutParaviewFieldToClass(self, objName) ### add outParaviewTimeDomain{HF,RHS,Approx,Res,Err} methods """ Output * to ParaView file, converted to time domain. Args: mu: Target parameter. omega(optional): frequency. timeFinal(optional): final time of simulation. periodResolution(optional): number of time steps per period. name(optional): Base name to be used for data output. filename(optional): Name of output file. forceNewFile(optional): Whether to create new output file. homogeneized(optional): Whether to remove Dirichlet BC. Defaults to False. """ for objName in ["HF", "RHS", "Approx", "Res", "Err"]: addOutParaviewTimeDomainFieldToClass(self, objName) def _preInit(self): if not hasattr(self, "depth"): self.depth = 0 else: self.depth += 1 @property def parameterList(self): """Value of parameterListSoft + parameterListCritical.""" return self.parameterListSoft + self.parameterListCritical def _addParametersToList(self, whatSoft:strLst, defaultSoft:ListAny, whatCritical : strLst = [], - defaultCritical : ListAny = []): + defaultCritical : ListAny = [], + toBeExcluded : strLst = []): + if not hasattr(self, "parameterToBeExcluded"): + self.parameterToBeExcluded = [] + self.parameterToBeExcluded += toBeExcluded if not hasattr(self, "parameterListSoft"): self.parameterListSoft = [] if not hasattr(self, "parameterDefaultSoft"): self.parameterDefaultSoft = {} if not hasattr(self, "parameterListCritical"): self.parameterListCritical = [] if not hasattr(self, "parameterDefaultCritical"): self.parameterDefaultCritical = {} - self.parameterListSoft += whatSoft for j, what in enumerate(whatSoft): - self.parameterDefaultSoft[what] = defaultSoft[j] - self.parameterListCritical += whatCritical + if what not in self.parameterToBeExcluded: + self.parameterListSoft += [what] + self.parameterDefaultSoft[what] = defaultSoft[j] for j, what in enumerate(whatCritical): - self.parameterDefaultCritical[what] = defaultCritical[j] + if what not in self.parameterToBeExcluded: + self.parameterListCritical += [what] + self.parameterDefaultCritical[what] = defaultCritical[j] def _postInit(self): if self.depth == 0: if self.verbosity >= 10: verbosityDepth("DEL", "Done initializing.", timestamp = self.timestamp) del self.depth else: self.depth -= 1 def name(self) -> str: return self.__class__.__name__ def __str__(self) -> str: return self.name() def __repr__(self) -> str: return self.__str__() + " at " + hex(id(self)) def setupSampling(self): """Setup sampling engine.""" RROMPyAssert(self._mode, message = "Cannot setup sampling engine.") if not hasattr(self, "_POD") or self._POD is None: return if self.POD: SamplingEngine = SamplingEngineLinearPOD else: SamplingEngine = SamplingEngineLinear self.samplingEngine = SamplingEngine(self.HFEngine, verbosity = self.verbosity) @property def HFEngine(self): """Value of HFEngine.""" return self._HFEngine @HFEngine.setter def HFEngine(self, HFEngine): raise RROMPyException("Cannot change HFEngine.") @property def mu0(self): """Value of mu0.""" return self._mu0 @mu0.setter def mu0(self, mu0): mu0 = checkParameter(mu0) if not hasattr(self, "_mu0") or mu0 != self.mu0: self.resetSamples() self._mu0 = mu0 @property def npar(self): """Number of parameters.""" return self.mu0.shape[1] @property def approxParameters(self): """Value of approximant parameters.""" return self._approxParameters @approxParameters.setter def approxParameters(self, approxParams): if not hasattr(self, "approxParameters"): self._approxParameters = {} approxParameters = purgeDict(approxParams, self.parameterList, dictname = self.name() + ".approxParameters", baselevel = 1) keyList = list(approxParameters.keys()) fragile = False for key in self.parameterListCritical: if key in keyList: val = approxParameters[key] else: val = getattr(self, "_" + key, None) if val is None: val = self.parameterDefaultCritical[key] getattr(self.__class__, key, None).fset(self, val) fragile = fragile or val is None for key in self.parameterListSoft: if key in keyList: val = approxParameters[key] else: val = getattr(self, "_" + key, None) if val is None: val = self.parameterDefaultSoft[key] getattr(self.__class__, key, None).fset(self, val) if fragile: self._mode = RROMPy_FRAGILE @property def POD(self): """Value of POD.""" return self._POD @POD.setter def POD(self, POD): if hasattr(self, "_POD"): PODold = self.POD else: PODold = -1 self._POD = POD self._approxParameters["POD"] = self.POD if PODold != self.POD: self.samplingEngine = None self.resetSamples() @property def S(self): """Value of S.""" return self._S @S.setter def S(self, S): if not hasattr(S, "__len__"): S = [S] if any([s <= 0 for s in S]): raise RROMPyException("S must be positive.") if hasattr(self, "_S") and self._S is not None: Sold = tuple(self.S) else: Sold = -1 self._S = S self._approxParameters["S"] = self.S if Sold != tuple(self.S): self.resetSamples() @property def homogeneized(self): """Value of homogeneized.""" return self._homogeneized @homogeneized.setter def homogeneized(self, homogeneized): if not hasattr(self, "_homogeneized"): self._homogeneized = None if homogeneized != self.homogeneized: self._homogeneized = homogeneized self.resetSamples() @property def trainedModel(self): """Value of trainedModel.""" return self._trainedModel @trainedModel.setter def trainedModel(self, trainedModel): self._trainedModel = trainedModel self.lastSolvedAppReduced = emptyParameterList() self.lastSolvedApp = emptyParameterList() self.uAppReduced = emptySampleList() self.uApp = emptySampleList() def resetSamples(self): if hasattr(self, "samplingEngine") and self.samplingEngine is not None: self.samplingEngine.resetHistory() else: self.setupSampling() self._mode = RROMPy_READY def plotSamples(self, name : str = "u", save : str = None, what : strLst = 'all', saveFormat : str = "eps", saveDPI : int = 100, **figspecs): """ Do some nice plots of the samples. Args: name(optional): Name to be shown as title of the plots. Defaults to 'u'. what(optional): Which plots to do. If list, can contain 'ABS', 'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard 'ALL'. Defaults to 'ALL'. save(optional): Where to save plot(s). Defaults to None, i.e. no saving. saveFormat(optional): Format for saved plot(s). Defaults to "eps". saveDPI(optional): DPI for saved plot(s). Defaults to 100. figspecs(optional key args): Optional arguments for matplotlib figure creation. """ RROMPyAssert(self._mode, message = "Cannot plot samples.") self.samplingEngine.plotSamples(name = name, save = save, what = what, saveFormat = saveFormat, saveDPI = saveDPI, **figspecs) def outParaviewSamples(self, name : str = "u", filename : str = "out", times : Np1D = None, what : strLst = 'all', forceNewFile : bool = True, folders : bool = False, filePW = None): """ Output samples to ParaView file. Args: name(optional): Base name to be used for data output. filename(optional): Name of output file. times(optional): Timestamps. what(optional): Which plots to do. If list, can contain 'MESH', 'ABS', 'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard 'ALL'. Defaults to 'ALL'. forceNewFile(optional): Whether to create new output file. folders(optional): Whether to split output in folders. filePW(optional): Fenics File entity (for time series). """ RROMPyAssert(self._mode, message = "Cannot output samples.") self.samplingEngine.outParaviewSamples(name = name, filename = filename, times = times, what = what, forceNewFile = forceNewFile, folders = folders, filePW = filePW) def outParaviewTimeDomainSamples(self, omegas : Np1D = None, timeFinal : Np1D = None, periodResolution : int = 20, name : str = "u", filename : str = "out", forceNewFile : bool = True, folders : bool = False): """ Output samples to ParaView file, converted to time domain. Args: omegas(optional): frequencies. timeFinal(optional): final time of simulation. periodResolution(optional): number of time steps per period. name(optional): Base name to be used for data output. filename(optional): Name of output file. forceNewFile(optional): Whether to create new output file. folders(optional): Whether to split output in folders. """ RROMPyAssert(self._mode, message = "Cannot output samples.") self.samplingEngine.outParaviewTimeDomainSamples(omegas = omegas, timeFinal = timeFinal, periodResolution = periodResolution, name = name, filename = filename, forceNewFile = forceNewFile, folders = folders) def setTrainedModel(self, model): """Deepcopy approximation from trained model.""" if hasattr(model, "storeTrainedModel"): verb = model.verbosity model.verbosity = 0 fileOut = model.storeTrainedModel() model.verbosity = verb else: try: fileOut = getNewFilename("trained_model", "pkl") pickleDump(model.data.__dict__, fileOut) except: raise RROMPyException(("Failed to store model data. Parameter " "model must have either " "storeTrainedModel or " "data.__dict__ properties.")) self.loadTrainedModel(fileOut) osrm(fileOut) @abstractmethod def setupApprox(self): """ Setup approximant. (ABSTRACT) Any specialization should include something like if self.checkComputedApprox(): return RROMPyAssert(self._mode, message = "Cannot setup approximant.") ... self.trainedModel = ... self.trainedModel.data = ... self.trainedModel.data.approxParameters = copy( self.approxParameters) """ pass def checkComputedApprox(self) -> bool: """ Check if setup of new approximant is not needed. Returns: True if new setup is not needed. False otherwise. """ return self._mode == RROMPy_FRAGILE or (self.trainedModel is not None and self.trainedModel.data.approxParameters == self.approxParameters) def setHF(self, muHF:paramList, uHF:sampleList, append : bool = False) -> List[int]: """Assign high fidelity solution.""" newSolvedHF, _ = checkParameterList(muHF, self.npar) newuHF = sampleList(uHF) if append: self.lastSolvedHF.append(newSolvedHF) self.uHF.append(newuHF) return list(range(len(self.uHF) - len(newuHF), len(self.uHF))) self.lastSolvedHF, _ = checkParameterList(newSolvedHF, self.npar) self.uHF = sampleList(newuHF) return list(range(len(self.uHF))) def evalHF(self, mu:paramList, append : bool = False, prune : bool = True) -> List[int]: """ Find high fidelity solution with original parameters and arbitrary parameter. Args: mu: Target parameter. append(optional): Whether to append new HF solutions to old ones. prune(optional): Whether to remove duplicates of already appearing HF solutions. """ mu, _ = checkParameterList(mu, self.npar) idx = np.empty(len(mu), dtype = np.int) if prune: jExtra = np.zeros(len(mu), dtype = bool) muKeep = emptyParameterList() muExtra = copy(muKeep) for j in range(len(mu)): jPos = self.lastSolvedHF.find(mu[j]) if jPos is not None: idx[j] = jPos muKeep.append(mu[j]) else: jExtra[j] = True muExtra.append(mu[j]) if len(muKeep) > 0 and not append: idx[~jExtra] = self.setHF(muKeep, self.uHF[idx[~jExtra]], append) append = True else: jExtra = np.ones(len(mu), dtype = bool) muExtra = mu if len(muExtra) > 0: newuHFs = self.samplingEngine.solveLS(muExtra, homogeneized = self.homogeneized) idx[jExtra] = self.setHF(muExtra, newuHFs, append) return list(idx) def setApproxReduced(self, muApp:paramList, uApp:sampleList, append : bool = False) -> List[int]: """Assign high fidelity solution.""" newSolvedApp, _ = checkParameterList(muApp, self.npar) newuApp = sampleList(uApp) if append: self.lastSolvedAppReduced.append(newSolvedApp) self.uAppReduced.append(newuApp) return list(range(len(self.uAppReduced) - len(newuApp), len(self.uAppReduced))) self.lastSolvedAppReduced, _ = checkParameterList(newSolvedApp, self.npar) self.uAppReduced = sampleList(newuApp) return list(range(len(self.uAppReduced))) def evalApproxReduced(self, mu:paramList, append : bool = False, prune : bool = True) -> List[int]: """ Evaluate reduced representation of approximant at arbitrary parameter. Args: mu: Target parameter. """ self.setupApprox() mu, _ = checkParameterList(mu, self.npar) idx = np.empty(len(mu), dtype = np.int) if prune: jExtra = np.zeros(len(mu), dtype = bool) muKeep = emptyParameterList() muExtra = copy(muKeep) for j in range(len(mu)): jPos = self.lastSolvedAppReduced.find(mu[j]) if jPos is not None: idx[j] = jPos muKeep.append(mu[j]) else: jExtra[j] = True muExtra.append(mu[j]) if len(muKeep) > 0 and not append: idx[~jExtra] = self.setApproxReduced(muKeep, self.uAppReduced[idx[~jExtra]], append) append = True else: jExtra = np.ones(len(mu), dtype = bool) muExtra = mu if len(muExtra) > 0: newuApps = self.trainedModel.getApproxReduced(muExtra) idx[jExtra] = self.setApproxReduced(muExtra, newuApps, append) return list(idx) def setApprox(self, muApp:paramList, uApp:sampleList, append : bool = False) -> List[int]: """Assign high fidelity solution.""" newSolvedApp, _ = checkParameterList(muApp, self.npar) newuApp = sampleList(uApp) if append: self.lastSolvedApp.append(newSolvedApp) self.uApp.append(newuApp) return list(range(len(self.uApp) - len(newuApp), len(self.uApp))) self.lastSolvedApp, _ = checkParameterList(newSolvedApp, self.npar) self.uApp = sampleList(newuApp) return list(range(len(self.uApp))) def evalApprox(self, mu:paramList, append : bool = False, prune : bool = True) -> List[int]: """ Evaluate approximant at arbitrary parameter. Args: mu: Target parameter. """ self.setupApprox() mu, _ = checkParameterList(mu, self.npar) idx = np.empty(len(mu), dtype = np.int) if prune: jExtra = np.zeros(len(mu), dtype = bool) muKeep = emptyParameterList() muExtra = copy(muKeep) for j in range(len(mu)): jPos = self.lastSolvedApp.find(mu[j]) if jPos is not None: idx[j] = jPos muKeep.append(mu[j]) else: jExtra[j] = True muExtra.append(mu[j]) if len(muKeep) > 0 and not append: idx[~jExtra] = self.setApprox(muKeep, self.uApp[idx[~jExtra]], append) append = True else: jExtra = np.ones(len(mu), dtype = bool) muExtra = mu if len(muExtra) > 0: newuApps = self.trainedModel.getApprox(muExtra) idx[jExtra] = self.setApprox(muExtra, newuApps, append) return list(idx) def getHF(self, mu:paramList, homogeneized : bool = False, append : bool = False, prune : bool = True) -> sampList: """ Get HF solution at arbitrary parameter. Args: mu: Target parameter. homogeneized(optional): Whether to remove Dirichlet BC. Defaults to False. Returns: HFsolution. """ mu, _ = checkParameterList(mu, self.npar) idx = self.evalHF(mu, append = append, prune = prune) uHFs = self.uHF(idx) if self.homogeneized and not homogeneized: for j, m in enumerate(mu): uHFs[j] += self.HFEngine.liftDirichletData(m) if not self.homogeneized and homogeneized: for j, m in enumerate(mu): uHFs[j] -= self.HFEngine.liftDirichletData(m) return uHFs def getRHS(self, mu:paramList, homogeneized : bool = False) -> sampList: """ Get linear system RHS at arbitrary parameter. Args: mu: Target parameter. homogeneized(optional): Whether to remove Dirichlet BC. Defaults to False. Returns: Linear system RHS. """ return self.HFEngine.residual(None, mu, homogeneized = homogeneized) def getApproxReduced(self, mu:paramList, append : bool = False, prune : bool = True) -> sampList: """ Get approximant at arbitrary parameter. Args: mu: Target parameter. Returns: Reduced approximant. """ mu, _ = checkParameterList(mu, self.npar) idx = self.evalApproxReduced(mu, append = append, prune = prune) uAppRs = self.uAppReduced(idx) return uAppRs def getApprox(self, mu:paramList, homogeneized : bool = False, append : bool = False, prune : bool = True) -> sampList: """ Get approximant at arbitrary parameter. Args: mu: Target parameter. homogeneized(optional): Whether to remove Dirichlet BC. Defaults to False. Returns: Approximant. """ mu, _ = checkParameterList(mu, self.npar) idx = self.evalApprox(mu, append = append, prune = prune) uApps = self.uApp(idx) if self.homogeneized and not homogeneized: for j, m in enumerate(mu): uApps[j] += self.HFEngine.liftDirichletData(m) if not self.homogeneized and homogeneized: for j, m in enumerate(mu): uApps[j] -= self.HFEngine.liftDirichletData(m) return uApps def getRes(self, mu:paramList, homogeneized : bool = False) -> sampList: """ Get residual at arbitrary parameter. Args: mu: Target parameter. homogeneized(optional): Whether to remove Dirichlet BC. Defaults to False. Returns: Approximant residual. """ return self.HFEngine.residual(self.getApprox(mu, homogeneized), mu, homogeneized = homogeneized) def getErr(self, mu:paramList, homogeneized : bool = False) -> sampList: """ Get error at arbitrary parameter. Args: mu: Target parameter. homogeneized(optional): Whether to remove Dirichlet BC. Defaults to False. Returns: Approximant error. """ return self.getApprox(mu, homogeneized) - self.getHF(mu, homogeneized) def getPoles(self) -> Np1D: """ Obtain approximant poles. Returns: Numpy complex vector of poles. """ self.setupApprox() if self.verbosity >= 20: verbosityDepth("INIT", "Computing poles of model.", timestamp = self.timestamp) poles = self.trainedModel.getPoles() if self.verbosity >= 20: verbosityDepth("DEL", "Done computing poles.", timestamp = self.timestamp) return poles def storeTrainedModel(self, filenameBase : str = "trained_model", forceNewFile : bool = True) -> str: """Store trained reduced model to file.""" self.setupApprox() if self.verbosity >= 20: verbosityDepth("INIT", "Storing trained model to file.", timestamp = self.timestamp) if forceNewFile: filename = getNewFilename(filenameBase, "pkl") else: filename = "{}.pkl".format(filenameBase) pickleDump(self.trainedModel.data.__dict__, filename) if self.verbosity >= 20: verbosityDepth("DEL", "Done storing trained model.", timestamp = self.timestamp) return filename def loadTrainedModel(self, filename:str): """Load trained reduced model from file.""" if self.verbosity >= 20: verbosityDepth("INIT", "Loading pre-trained model from file.", timestamp = self.timestamp) datadict = pickleLoad(filename) name = datadict.pop("name") if name == "TrainedModelPade": from rrompy.reduction_methods.trained_model import \ TrainedModelPade as tModel elif name == "TrainedModelRB": from rrompy.reduction_methods.trained_model import \ TrainedModelRB as tModel else: raise RROMPyException(("Trained model name not recognized. " "Loading failed.")) self.mu0 = datadict.pop("mu0") from rrompy.reduction_methods.trained_model import TrainedModelData trainedModel = tModel() trainedModel.verbosity = self.verbosity trainedModel.timestamp = self.timestamp data = TrainedModelData(name, self.mu0, datadict.pop("projMat"), datadict.pop("rescalingExp")) if "mus" in datadict: data.mus = datadict.pop("mus") approxParameters = datadict.pop("approxParameters") data.approxParameters = copy(approxParameters) if "sampler" in approxParameters: self._approxParameters["sampler"] = approxParameters.pop("sampler") self.approxParameters = copy(approxParameters) if "mus" in data.__dict__: self.mus = copy(data.mus) if name == "TrainedModelPade": self.scaleFactor = datadict.pop("scaleFactor") data.scaleFactor = self.scaleFactor for key in datadict: setattr(data, key, datadict[key]) trainedModel.data = data self.trainedModel = trainedModel self._mode = RROMPy_FRAGILE if self.verbosity >= 20: verbosityDepth("DEL", "Done loading pre-trained model.", timestamp = self.timestamp) diff --git a/rrompy/reduction_methods/centered/rational_pade.py b/rrompy/reduction_methods/centered/rational_pade.py index cc086a2..20ea707 100644 --- a/rrompy/reduction_methods/centered/rational_pade.py +++ b/rrompy/reduction_methods/centered/rational_pade.py @@ -1,414 +1,434 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from copy import deepcopy as copy import numpy as np from rrompy.reduction_methods.base import checkRobustTolerance from rrompy.reduction_methods.trained_model import (TrainedModelData, TrainedModelPade as tModel) from .generic_centered_approximant import GenericCenteredApproximant from rrompy.utilities.base.types import (Np1D, Np2D, Tuple, DictAny, HFEng, paramVal, paramList, sampList) from rrompy.utilities.base import verbosityDepth from rrompy.utilities.poly_fitting.polynomial import (nextDerivativeIndices, hashDerivativeToIdx as hashD, hashIdxToDerivative as hashI) from rrompy.utilities.exception_manager import (RROMPyException, RROMPyAssert, RROMPyWarning) __all__ = ['RationalPade'] class RationalPade(GenericCenteredApproximant): """ ROM single-point fast Pade' approximant computation for parametric problems. Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': whether to compute POD of snapshots; defaults to True; - 'S': total number of samples current approximant relies upon; - 'M': degree of Pade' approximant numerator; defaults to 0; - 'N': degree of Pade' approximant denominator; defaults to 0; - 'robustTol': tolerance for robust Pade' denominator management; defaults to 0. Defaults to empty dict. homogeneized(optional): Whether to homogeneize Dirichlet BCs. Defaults to False. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. mu0: Default parameter. homogeneized: Whether to homogeneize Dirichlet BCs. approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList. parameterListSoft: Recognized keys of soft approximant parameters: - 'POD': whether to compute POD of snapshots; - 'M': degree of Pade' approximant numerator; - 'N': degree of Pade' approximant denominator; - 'robustTol': tolerance for robust Pade' denominator management. parameterListCritical: Recognized keys of critical approximant parameters: - 'S': total number of samples current approximant relies upon. POD: Whether to compute QR factorization of derivatives. S: Number of solution snapshots over which current approximant is based upon. M: Numerator degree of approximant. N: Denominator degree of approximant. robustTol: Tolerance for robust Pade' denominator management. E: Complete derivative depth level of S. uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as sampleList. lastSolvedHF: Parameter(s) corresponding to last computed high fidelity solution(s) as parameterList. uAppReduced: Reduced approximate solution(s) with parameter(s) lastSolvedApp as sampleList. lastSolvedAppReduced: Parameter(s) corresponding to last computed reduced approximate solution(s) as parameterList. uApp: Approximate solution(s) with parameter(s) lastSolvedApp as sampleList. lastSolvedApp: Parameter(s) corresponding to last computed approximate solution(s) as parameterList. G: Square Numpy 2D vector of size (N+1) corresponding to Pade' denominator matrix (see paper). """ def __init__(self, HFEngine:HFEng, mu0 : paramVal = None, approxParameters : DictAny = {}, homogeneized : bool = False, verbosity : int = 10, timestamp : bool = True): self._preInit() - self._addParametersToList(["M", "N", "robustTol"], [0, 0, 0]) + self._addParametersToList(["M", "N", "E", "robustTol"], [0, 0, -1, 0]) super().__init__(HFEngine = HFEngine, mu0 = mu0, approxParameters = approxParameters, homogeneized = homogeneized, verbosity = verbosity, timestamp = timestamp) self._postInit() @property def M(self): """Value of M..""" return self._M @M.setter def M(self, M): if M < 0: raise RROMPyException("M must be non-negative.") self._M = M self._approxParameters["M"] = self.M - if hasattr(self, "E") and self.E < self.M: - RROMPyWarning("Prescribed S is too small. Decreasing M.") + if hasattr(self, "_E") and self.E < self.M: + RROMPyWarning("Prescribed E is too small. Decreasing M.") self.M = self.E @property def N(self): """Value of N.""" return self._N @N.setter def N(self, N): if N < 0: raise RROMPyException("N must be non-negative.") self._N = N self._approxParameters["N"] = self.N - if hasattr(self, "E") and self.E < self.N: - RROMPyWarning("Prescribed S is too small. Decreasing N.") + if hasattr(self, "_E") and self.E < self.N: + RROMPyWarning("Prescribed E is too small. Decreasing N.") self.N = self.E + @property + def E(self): + """Value of E.""" + return self._E + @E.setter + def E(self, E): + if E < 0: + if not hasattr(self, "_S"): + raise RROMPyException(("Value of E must be positive if S is " + "not yet assigned.")) + E = np.sum(hashI(np.prod(self.S), self.npar)) - 1 + self._E = E + self._approxParameters["E"] = self.E + if (hasattr(self, "_S") + and self.E >= np.sum(hashI(np.prod(self.S), self.npar))): + RROMPyWarning("Prescribed S is too small. Decreasing E.") + self.E = -1 + if hasattr(self, "_M"): self.M = self.M + if hasattr(self, "_N"): self.N = self.N + @property def robustTol(self): """Value of tolerance for robust Pade' denominator management.""" return self._robustTol @robustTol.setter def robustTol(self, robustTol): if robustTol < 0.: RROMPyWarning(("Overriding prescribed negative robustness " "tolerance to 0.")) robustTol = 0. self._robustTol = robustTol self._approxParameters["robustTol"] = self.robustTol @property def S(self): """Value of S.""" return self._S @S.setter def S(self, S): GenericCenteredApproximant.S.fset(self, S) - self.E = np.sum(hashI(np.prod(self.S), self.npar)) - 1 if hasattr(self, "_M"): self.M = self.M if hasattr(self, "_N"): self.N = self.N + if hasattr(self, "_E"): self.E = self.E def _setupDenominator(self): """Compute Pade' denominator.""" RROMPyAssert(self._mode, message = "Cannot setup denominator.") if self.verbosity >= 7: verbosityDepth("INIT", "Starting computation of denominator.", timestamp = self.timestamp) while self.N > 0: if self.POD: ev, eV = self.findeveVGQR() else: ev, eV = self.findeveVGExplicit() newParameters = checkRobustTolerance(ev, self.N, self.robustTol) if not newParameters: break self.approxParameters = newParameters if self.N <= 0: eV = np.ones((1, 1)) q = np.zeros(tuple([self.N + 1] * self.npar), dtype = np.complex) for j in range(eV.shape[0]): q[tuple(hashI(j, self.npar))] = eV[j, 0] if self.verbosity >= 7: verbosityDepth("DEL", "Done computing denominator.", timestamp = self.timestamp) return q def _setupNumerator(self): """Compute Pade' numerator.""" RROMPyAssert(self._mode, message = "Cannot setup numerator.") if self.verbosity >= 7: verbosityDepth("INIT", "Starting computation of numerator.", timestamp = self.timestamp) P = np.zeros(tuple([self.M + 1] * self.npar) + (np.prod(self.S),), dtype = np.complex) mEnd = hashD([self.M + 1] + [0] * (self.npar - 1)) nEnd = hashD([self.N + 1] + [0] * (self.npar - 1)) mnIdxs = nextDerivativeIndices([], self.npar, max(mEnd, nEnd)) for j in range(mEnd): mIdx = mnIdxs[j] for n in range(nEnd): diffIdx = [x - y for (x, y) in zip(mIdx, mnIdxs[n])] if all([x >= 0 for x in diffIdx]): P[tuple(mIdx) + (hashD(diffIdx),)] = ( self.trainedModel.data.Q[tuple(mnIdxs[n])]) return self.rescaleByParameter(P).T def setupApprox(self): """ Compute Pade' approximant. SVD-based robust eigenvalue management. """ if self.checkComputedApprox(): return RROMPyAssert(self._mode, message = "Cannot setup approximant.") if self.verbosity >= 5: verbosityDepth("INIT", "Setting up {}.". format(self.name()), timestamp = self.timestamp) self.computeDerivatives() if self.trainedModel is None: self.trainedModel = tModel() self.trainedModel.verbosity = self.verbosity self.trainedModel.timestamp = self.timestamp data = TrainedModelData(self.trainedModel.name(), self.mu0, None, self.HFEngine.rescalingExp) data.polytype = "MONOMIAL" self.trainedModel.data = data else: self.trainedModel = self.trainedModel if self.N > 0: Q = self._setupDenominator() else: self.setScaleParameter() Q = np.ones(1, dtype = np.complex) self.trainedModel.data.Q = copy(Q) self.trainedModel.data.scaleFactor = self.scaleFactor self.trainedModel.data.projMat = copy(self.samplingEngine.samples( list(range(np.prod(self.S))))) P = self._setupNumerator() if self.POD: P = np.tensordot(self.samplingEngine.RPOD, P, axes = ([-1], [0])) self.trainedModel.data.P = copy(P) self.trainedModel.data.approxParameters = copy(self.approxParameters) if self.verbosity >= 5: verbosityDepth("DEL", "Done setting up approximant.", timestamp = self.timestamp) def setScaleParameter(self) -> Np2D: """Compute parameter for rescaling.""" RROMPyAssert(self._mode, message = "Cannot compute rescaling factor.") self.computeDerivatives() self.scaleFactor = [1.] * self.npar for d in range(self.npar): hashesd = [0] for n in range(1, self.E + 1): hashesd += [hashD([0] * (d - 1) + [n] + [0] * (self.npar - d - 1))] if self.POD: Rd = self.samplingEngine.RPOD[: hashesd[-1] + 1, hashesd] Gd = np.diag(Rd.T.conj().dot(Rd)) else: DerEd = self.samplingEngine.samples(hashesd) Gd = self.HFEngine.norm(DerEd) scaleCoeffs = np.polyfit(np.arange(len(Gd)), np.log(Gd), 1) self.scaleFactor[d] = np.exp(- scaleCoeffs[0] / 2.) def rescaleByParameter(self, R:Np2D) -> Np2D: """ Rescale by scale parameter. Args: R: Matrix whose columns need rescaling. Returns: Rescaled matrix. """ RIdxs = nextDerivativeIndices([], self.npar, R.shape[-1]) Rscaled = copy(R) for j, RIdx in enumerate(RIdxs): Rscaled[..., j] *= np.prod([x ** y for (x, y) in zip(self.scaleFactor, RIdx)]) return Rscaled def buildG(self): """Assemble Pade' denominator matrix.""" RROMPyAssert(self._mode, message = "Cannot compute G matrix.") self.computeDerivatives() if self.verbosity >= 10: verbosityDepth("INIT", "Building gramian matrix.", timestamp = self.timestamp) eStart = hashD([self.E] + [0] * (self.npar - 1)) eEnd = hashD([self.E + 1] + [0] * (self.npar - 1)) eIdxs = [hashI(e, self.npar) for e in range(eStart, eEnd)] nEnd = hashD([self.N + 1] + [0] * (self.npar - 1)) nIdxs = nextDerivativeIndices([], self.npar, nEnd) self.setScaleParameter() if self.POD: RPODE = self.rescaleByParameter(self.samplingEngine.RPOD[: eEnd, : eEnd]) else: DerE = self.rescaleByParameter(self.samplingEngine.samples( list(range(eEnd))).data) self.G = np.zeros((nEnd, nEnd), dtype = np.complex) for eIdx in eIdxs: nLoc = [] samplesIdxs = [] for n, nIdx in enumerate(nIdxs): diffIdx = [x - y for (x, y) in zip(eIdx, nIdx)] if all([x >= 0 for x in diffIdx]): nLoc += [n] samplesIdxs += [hashD(diffIdx)] if self.POD: RPODELoc = RPODE[: samplesIdxs[-1] + 1, samplesIdxs] GLoc = RPODELoc.T.conj().dot(RPODELoc) else: DerELoc = DerE[:, samplesIdxs] GLoc = self.HFEngine.innerProduct(DerELoc, DerELoc) for j in range(len(nLoc)): self.G[nLoc[j], nLoc] = self.G[nLoc[j], nLoc] + GLoc[j] if self.verbosity >= 10: verbosityDepth("DEL", "Done building gramian.", timestamp = self.timestamp) def findeveVGExplicit(self) -> Tuple[Np1D, Np2D]: """ Compute explicitly eigenvalues and eigenvectors of Pade' denominator matrix. """ RROMPyAssert(self._mode, message = "Cannot solve eigenvalue problem.") self.buildG() if self.verbosity >= 7: verbosityDepth("INIT", "Solving eigenvalue problem for gramian matrix.", timestamp = self.timestamp) ev, eV = np.linalg.eigh(self.G) if self.verbosity >= 5: try: condev = ev[-1] / ev[0] except: condev = np.inf verbosityDepth("MAIN", ("Solved eigenvalue problem of size {} " "with condition number {:.4e}.").format( self.G.shape[0], condev), timestamp = self.timestamp) if self.verbosity >= 7: verbosityDepth("DEL", "Done solving eigenvalue problem.", timestamp = self.timestamp) return ev, eV def findeveVGQR(self) -> Tuple[Np1D, Np2D]: """ Compute eigenvalues and eigenvectors of Pade' denominator matrix through SVD of R factor. Returns: Eigenvalues in ascending order and corresponding eigenvector matrix. """ RROMPyAssert(self._mode, message = "Cannot solve eigenvalue problem.") RROMPyAssert(self.POD, True, "POD value") self.computeDerivatives() eStart = hashD([self.E] + [0] * (self.npar - 1)) eEnd = hashD([self.E + 1] + [0] * (self.npar - 1)) eIdxs = [hashI(e, self.npar) for e in range(eStart, eEnd)] nEnd = hashD([self.N + 1] + [0] * (self.npar - 1)) nIdxs = nextDerivativeIndices([], self.npar, nEnd) self.setScaleParameter() RPODE = self.rescaleByParameter(self.samplingEngine.RPOD[: eEnd, : eEnd]) Rstack = np.zeros((RPODE.shape[0] * (eEnd - eStart), nEnd), dtype = np.complex) for k, eIdx in enumerate(eIdxs): nLoc = [] samplesIdxs = [] for n, nIdx in enumerate(nIdxs): diffIdx = [x - y for (x, y) in zip(eIdx, nIdx)] if all([x >= 0 for x in diffIdx]): nLoc += [n] samplesIdxs += [hashD(diffIdx)] RPODELoc = RPODE[:, samplesIdxs] for j in range(len(nLoc)): Rstack[k * RPODE.shape[0] : (k + 1) * RPODE.shape[0], nLoc[j]] = RPODELoc[:, j] if self.verbosity >= 7: verbosityDepth("INIT", ("Solving svd for square root of " "gramian matrix."), timestamp = self.timestamp) sizeI = Rstack.shape _, s, V = np.linalg.svd(Rstack, full_matrices = False) eV = V[::-1, :].T.conj() if self.verbosity >= 5: try: condev = s[0] / s[-1] except: condev = np.inf verbosityDepth("MAIN", ("Solved svd problem of size {} x {} with " "condition number {:.4e}.").format(*sizeI, condev), timestamp = self.timestamp) if self.verbosity >= 7: verbosityDepth("DEL", "Done solving eigenvalue problem.", timestamp = self.timestamp) return s[::-1], eV def centerNormalize(self, mu : paramList = [], mu0 : paramVal = None) -> paramList: """ Compute normalized parameter to be plugged into approximant. Args: mu: Parameter(s) 1. mu0: Parameter(s) 2. If None, set to self.mu0. Returns: Normalized parameter. """ return self.trainedModel.centerNormalize(mu, mu0) def getResidues(self) -> sampList: """ Obtain approximant residues. Returns: Matrix with residues as columns. """ return self.trainedModel.getResidues() diff --git a/rrompy/reduction_methods/distributed/rational_interpolant.py b/rrompy/reduction_methods/distributed/rational_interpolant.py index 82ff842..d4f0218 100644 --- a/rrompy/reduction_methods/distributed/rational_interpolant.py +++ b/rrompy/reduction_methods/distributed/rational_interpolant.py @@ -1,508 +1,528 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from copy import deepcopy as copy import numpy as np from rrompy.reduction_methods.base import checkRobustTolerance from .generic_distributed_approximant import GenericDistributedApproximant from rrompy.utilities.poly_fitting import customFit, customPInv from rrompy.utilities.poly_fitting.polynomial import (polybases, polyfitname, nextDerivativeIndices, hashDerivativeToIdx as hashD, hashIdxToDerivative as hashI, homogeneizedpolyvander) from rrompy.reduction_methods.trained_model import TrainedModelPade as tModel from rrompy.reduction_methods.trained_model import TrainedModelData from rrompy.utilities.base.types import (Np1D, Np2D, HFEng, DictAny, Tuple, List, paramVal, paramList, sampList) from rrompy.utilities.base import verbosityDepth, multifactorial from rrompy.utilities.exception_manager import (RROMPyException, RROMPyAssert, RROMPyWarning) __all__ = ['RationalInterpolant'] class RationalInterpolant(GenericDistributedApproximant): """ ROM rational interpolant computation for parametric problems. Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': whether to compute POD of snapshots; defaults to True; - 'S': total number of samples current approximant relies upon; - 'sampler': sample point generator; - 'polybasis': type of polynomial basis for interpolation; allowed values include 'MONOMIAL', 'CHEBYSHEV' and 'LEGENDRE'; defaults to 'MONOMIAL'; - 'M': degree of Pade' interpolant numerator; defaults to 0; - 'N': degree of Pade' interpolant denominator; defaults to 0; - 'interpRcond': tolerance for interpolation; defaults to None; - 'robustTol': tolerance for robust Pade' denominator management; defaults to 0. Defaults to empty dict. homogeneized(optional): Whether to homogeneize Dirichlet BCs. Defaults to False. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. mu0: Default parameter. mus: Array of snapshot parameters. homogeneized: Whether to homogeneize Dirichlet BCs. approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList. parameterListSoft: Recognized keys of soft approximant parameters: - 'POD': whether to compute POD of snapshots; - 'polybasis': type of polynomial basis for interpolation; - 'M': degree of Pade' interpolant numerator; - 'N': degree of Pade' interpolant denominator; - 'interpRcond': tolerance for interpolation via numpy.polyfit; - 'robustTol': tolerance for robust Pade' denominator management. parameterListCritical: Recognized keys of critical approximant parameters: - 'S': total number of samples current approximant relies upon; - 'sampler': sample point generator. POD: Whether to compute POD of snapshots. S: Number of solution snapshots over which current approximant is based upon. sampler: Sample point generator. polybasis: type of polynomial basis for interpolation. M: Numerator degree of approximant. N: Denominator degree of approximant. interpRcond: Tolerance for interpolation via numpy.polyfit. robustTol: Tolerance for robust Pade' denominator management. E: Complete derivative depth level of S. muBounds: list of bounds for parameter values. samplingEngine: Sampling engine. uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as sampleList. lastSolvedHF: Parameter(s) corresponding to last computed high fidelity solution(s) as parameterList. uAppReduced: Reduced approximate solution(s) with parameter(s) lastSolvedApp as sampleList. lastSolvedAppReduced: Parameter(s) corresponding to last computed reduced approximate solution(s) as parameterList. uApp: Approximate solution(s) with parameter(s) lastSolvedApp as sampleList. lastSolvedApp: Parameter(s) corresponding to last computed approximate solution(s) as parameterList. Q: Numpy 1D vector containing complex coefficients of approximant denominator. P: Numpy 2D vector whose columns are FE dofs of coefficients of approximant numerator. """ def __init__(self, HFEngine:HFEng, mu0 : paramVal = None, approxParameters : DictAny = {}, homogeneized : bool = False, verbosity : int = 10, timestamp : bool = True): self._preInit() - self._addParametersToList(["polybasis", "M", "N", "interpRcond", - "robustTol"], ["MONOMIAL", 0, 0, -1, 0]) + self._addParametersToList(["polybasis", "M", "N", "E", "interpRcond", + "robustTol"], ["MONOMIAL", 0, 0, -1, -1, 0]) super().__init__(HFEngine = HFEngine, mu0 = mu0, approxParameters = approxParameters, homogeneized = homogeneized, verbosity = verbosity, timestamp = timestamp) self._postInit() @property def polybasis(self): """Value of polybasis.""" return self._polybasis @polybasis.setter def polybasis(self, polybasis): try: polybasis = polybasis.upper().strip().replace(" ","") if polybasis not in polybases: raise RROMPyException("Prescribed polybasis not recognized.") self._polybasis = polybasis except: RROMPyWarning(("Prescribed polybasis not recognized. Overriding " "to 'MONOMIAL'.")) self._polybasis = "MONOMIAL" self._approxParameters["polybasis"] = self.polybasis @property def interpRcond(self): """Value of interpRcond.""" return self._interpRcond @interpRcond.setter def interpRcond(self, interpRcond): self._interpRcond = interpRcond self._approxParameters["interpRcond"] = self.interpRcond @property def M(self): """Value of M. Its assignment may change S.""" return self._M @M.setter def M(self, M): if M < 0: raise RROMPyException("M must be non-negative.") self._M = M self._approxParameters["M"] = self.M - if hasattr(self, "E") and self.E < self.M: + if hasattr(self, "_E") and self.E < self.M: RROMPyWarning("Prescribed S is too small. Decreasing M.") self.M = self.E @property def N(self): """Value of N. Its assignment may change S.""" return self._N @N.setter def N(self, N): if N < 0: raise RROMPyException("N must be non-negative.") self._N = N self._approxParameters["N"] = self.N - if hasattr(self, "E") and self.E < self.N: + if hasattr(self, "_E") and self.E < self.N: RROMPyWarning("Prescribed S is too small. Decreasing N.") self.N = self.E + @property + def E(self): + """Value of E.""" + return self._E + @E.setter + def E(self, E): + if E < 0: + if not hasattr(self, "_S"): + raise RROMPyException(("Value of E must be positive if S is " + "not yet assigned.")) + E = np.sum(hashI(np.prod(self.S), self.npar)) - 1 + self._E = E + self._approxParameters["E"] = self.E + if (hasattr(self, "_S") + and self.E >= np.sum(hashI(np.prod(self.S), self.npar))): + RROMPyWarning("Prescribed S is too small. Decreasing E.") + self.E = -1 + if hasattr(self, "_M"): self.M = self.M + if hasattr(self, "_N"): self.N = self.N + @property def robustTol(self): """Value of tolerance for robust Pade' denominator management.""" return self._robustTol @robustTol.setter def robustTol(self, robustTol): if robustTol < 0.: RROMPyWarning(("Overriding prescribed negative robustness " "tolerance to 0.")) robustTol = 0. self._robustTol = robustTol self._approxParameters["robustTol"] = self.robustTol @property def S(self): """Value of S.""" return self._S @S.setter def S(self, S): GenericDistributedApproximant.S.fset(self, S) - self.E = np.sum(hashI(np.prod(self.S), self.npar)) - 1 if hasattr(self, "_M"): self.M = self.M if hasattr(self, "_N"): self.N = self.N + if hasattr(self, "_E"): self.E = self.E def resetSamples(self): """Reset samples.""" super().resetSamples() self._musUniqueCN = None self._derIdxs = None self._reorder = None def _setupInterpolationIndices(self): """Setup parameters for polyvander.""" RROMPyAssert(self._mode, message = "Cannot setup interpolation indices.") if self._musUniqueCN is None or len(self._reorder) != len(self.mus): self._musUniqueCN, musIdxsTo, musIdxs, musCount = ( self.centerNormalize(self.mus).unique(return_index = True, return_inverse = True, return_counts = True)) self._musUnique = self.mus[musIdxsTo] self._derIdxs = [None] * len(self._musUniqueCN) self._reorder = np.empty(len(musIdxs), dtype = int) filled = 0 for j, cnt in enumerate(musCount): self._derIdxs[j] = nextDerivativeIndices([], self.mus.shape[1], cnt) jIdx = np.nonzero(musIdxs == j)[0] self._reorder[jIdx] = np.arange(filled, filled + cnt) filled += cnt def _setupDenominator(self): """Compute Pade' denominator.""" RROMPyAssert(self._mode, message = "Cannot setup denominator.") if self.verbosity >= 7: verbosityDepth("INIT", "Starting computation of denominator.", timestamp = self.timestamp) while self.N > 0: invD = self._computeInterpolantInverseBlocks() if self.POD: ev, eV = self.findeveVGQR(self.samplingEngine.RPOD, invD) else: ev, eV = self.findeveVGExplicit(self.samplingEngine.samples, invD) newParams = checkRobustTolerance(ev, self.N, self.robustTol) if not newParams: break self.approxParameters = newParams if self.N <= 0: self._N = 0 eV = np.ones((1, 1)) if self.verbosity >= 7: verbosityDepth("DEL", "Done computing denominator.", timestamp = self.timestamp) q = np.zeros(tuple([self.N + 1] * self.npar), dtype = eV.dtype) for j in range(eV.shape[0]): q[tuple(hashI(j, self.npar))] = eV[j, 0] return q def _setupNumerator(self): """Compute Pade' numerator.""" RROMPyAssert(self._mode, message = "Cannot setup numerator.") if self.verbosity >= 7: verbosityDepth("INIT", "Starting computation of numerator.", timestamp = self.timestamp) Qevaldiag = np.zeros((len(self.mus), len(self.mus)), dtype = np.complex) verb = self.trainedModel.verbosity self.trainedModel.verbosity = 0 self._setupInterpolationIndices() idxGlob = 0 for j, derIdxs in enumerate(self._derIdxs): nder = len(derIdxs) idxLoc = np.arange(len(self.mus))[(self._reorder >= idxGlob) * (self._reorder < idxGlob + nder)] idxGlob += nder Qval = [0] * nder for der in range(nder): derIdx = hashI(der, self.npar) Qval[der] = (self.trainedModel.getQVal( self._musUnique[j], derIdx, scl = np.power(self.scaleFactor, -1.)) / multifactorial(derIdx)) for derU, derUIdx in enumerate(derIdxs): for derQ, derQIdx in enumerate(derIdxs): diffIdx = [x - y for (x, y) in zip(derUIdx, derQIdx)] if all([x >= 0 for x in diffIdx]): diffj = hashD(diffIdx) Qevaldiag[idxLoc[derU], idxLoc[derQ]] = Qval[diffj] self.trainedModel.verbosity = verb while self.M >= 0: fitVander, _, argIdxs = homogeneizedpolyvander(self._musUniqueCN, self.M, self.polybasis, self._derIdxs, self._reorder, scl = np.power(self.scaleFactor, -1.)) fitVander = fitVander[:, argIdxs] fitOut = customFit(fitVander, Qevaldiag, full = True, rcond = self.interpRcond) if self.verbosity >= 5: condfit = fitOut[1][2][0] / fitOut[1][2][-1] verbosityDepth("MAIN", ("Fitting {} samples with degree {} " "through {}... Conditioning of LS " "system: {:.4e}.").format( fitVander.shape[0], self.M, polyfitname(self.polybasis), condfit), timestamp = self.timestamp) if fitOut[1][1] == fitVander.shape[1]: P = fitOut[0] break RROMPyWarning("Polyfit is poorly conditioned. Reducing M by 1.") self.M -= 1 if self.M < 0: raise RROMPyException(("Instability in computation of numerator. " "Aborting.")) if self.verbosity >= 7: verbosityDepth("DEL", "Done computing numerator.", timestamp = self.timestamp) p = np.zeros(tuple([self.M + 1] * self.npar) + (P.shape[1],), dtype = P.dtype) for j in range(P.shape[0]): p[tuple(hashI(j, self.npar))] = P[j, :] return p.T def setupApprox(self): """ Compute Pade' interpolant. SVD-based robust eigenvalue management. """ if self.checkComputedApprox(): return RROMPyAssert(self._mode, message = "Cannot setup approximant.") if self.verbosity >= 5: verbosityDepth("INIT", "Setting up {}.". format(self.name()), timestamp = self.timestamp) self.computeScaleFactor() self.computeSnapshots() if self.trainedModel is None: self.trainedModel = tModel() self.trainedModel.verbosity = self.verbosity self.trainedModel.timestamp = self.timestamp data = TrainedModelData(self.trainedModel.name(), self.mu0, self.samplingEngine.samples, self.HFEngine.rescalingExp) data.polytype = self.polybasis data.scaleFactor = self.scaleFactor data.mus = copy(self.mus) self.trainedModel.data = data else: self.trainedModel = self.trainedModel self.trainedModel.data.projMat = copy(self.samplingEngine.samples) if self.N > 0: Q = self._setupDenominator() else: Q = np.ones(1, dtype = np.complex) self.trainedModel.data.Q = copy(Q) P = self._setupNumerator() if self.POD: P = np.tensordot(self.samplingEngine.RPOD, P, axes = ([-1], [0])) self.trainedModel.data.P = copy(P) self.trainedModel.data.approxParameters = copy(self.approxParameters) if self.verbosity >= 5: verbosityDepth("DEL", "Done setting up approximant.", timestamp = self.timestamp) def _computeInterpolantInverseBlocks(self) -> List[Np2D]: """ Compute inverse factors for minimal interpolant target functional. """ RROMPyAssert(self._mode, message = "Cannot solve eigenvalue problem.") self._setupInterpolationIndices() eWidth = (hashD([self.E + 1] + [0] * (self.npar - 1)) - hashD([self.E] + [0] * (self.npar - 1))) TE, _, argIdxs = homogeneizedpolyvander(self._musUniqueCN, self.E, self.polybasis, self._derIdxs, self._reorder, scl = np.power(self.scaleFactor, -1.)) fitOut = customPInv(TE[:, argIdxs], rcond = self.interpRcond, full = True) self._fitinv = fitOut[0][- eWidth : , :] if self.verbosity >= 5: condfit = fitOut[1][1][0] / fitOut[1][1][-1] verbosityDepth("MAIN", ("Fitting {} samples with degree {} " "through {}... Conditioning of " "pseudoinverse system: {:.4e}.").format( TE.shape[0], self.E, polyfitname(self.polybasis), condfit), timestamp = self.timestamp) TN, _, argIdxs = homogeneizedpolyvander(self._musUniqueCN, self.N, self.polybasis, self._derIdxs, self._reorder, scl = np.power(self.scaleFactor, -1.)) TN = TN[:, argIdxs] invD = [None] * (eWidth) for k in range(eWidth): pseudoInv = np.diag(self._fitinv[k, :]) idxGlob = 0 for j, derIdxs in enumerate(self._derIdxs): nder = len(derIdxs) idxGlob += nder if nder > 1: idxLoc = np.arange(len(self.mus))[ (self._reorder >= idxGlob - nder) * (self._reorder < idxGlob)] invLoc = self._fitinv[k, idxLoc] pseudoInv[np.ix_(idxLoc, idxLoc)] = 0. for diffj, diffjIdx in enumerate(derIdxs): for derQ, derQIdx in enumerate(derIdxs): derUIdx = [x - y for (x, y) in zip(diffjIdx, derQIdx)] if all([x >= 0 for x in derUIdx]): derU = hashD(derUIdx) pseudoInv[idxLoc[derU], idxLoc[derQ]] = ( invLoc[diffj]) invD[k] = pseudoInv.dot(TN) return invD def findeveVGExplicit(self, sampleE:sampList, invD:List[Np2D]) -> Tuple[Np1D, Np2D]: """ Compute explicitly eigenvalues and eigenvectors of Pade' denominator matrix. """ RROMPyAssert(self._mode, message = "Cannot solve eigenvalue problem.") nEnd = invD[0].shape[1] eWidth = len(invD) if self.verbosity >= 10: verbosityDepth("INIT", "Building gramian matrix.", timestamp = self.timestamp) gramian = self.HFEngine.innerProduct(sampleE, sampleE) G = np.zeros((nEnd, nEnd), dtype = np.complex) for k in range(eWidth): G += invD[k].T.conj().dot(gramian.dot(invD[k])) if self.verbosity >= 10: verbosityDepth("DEL", "Done building gramian.", timestamp = self.timestamp) if self.verbosity >= 7: verbosityDepth("INIT", ("Solving eigenvalue problem for " "gramian matrix."), timestamp = self.timestamp) ev, eV = np.linalg.eigh(G) if self.verbosity >= 5: try: condev = ev[-1] / ev[0] except: condev = np.inf verbosityDepth("MAIN", ("Solved eigenvalue problem of " "size {} with condition number " "{:.4e}.").format(nEnd, condev), timestamp = self.timestamp) if self.verbosity >= 7: verbosityDepth("DEL", "Done solving eigenvalue problem.", timestamp = self.timestamp) return ev, eV def findeveVGQR(self, RPODE:Np2D, invD:List[Np2D]) -> Tuple[Np1D, Np2D]: """ Compute eigenvalues and eigenvectors of Pade' denominator matrix through SVD of R factor. """ RROMPyAssert(self._mode, message = "Cannot solve eigenvalue problem.") nEnd = invD[0].shape[1] S = RPODE.shape[0] eWidth = len(invD) if self.verbosity >= 10: verbosityDepth("INIT", "Building half-gramian matrix stack.", timestamp = self.timestamp) Rstack = np.zeros((S * eWidth, nEnd), dtype = np.complex) for k in range(eWidth): Rstack[k * S : (k + 1) * S, :] = RPODE.dot(invD[k]) if self.verbosity >= 10: verbosityDepth("DEL", "Done building half-gramian.", timestamp = self.timestamp) if self.verbosity >= 7: verbosityDepth("INIT", ("Solving svd for square root of " "gramian matrix."), timestamp = self.timestamp) _, s, eV = np.linalg.svd(Rstack, full_matrices = False) ev = s[::-1] eV = eV[::-1, :].T.conj() if self.verbosity >= 5: try: condev = s[0] / s[-1] except: condev = np.inf verbosityDepth("MAIN", ("Solved svd problem of size {} x " "{} with condition number " "{:.4e}.").format(*Rstack.shape, condev), timestamp = self.timestamp) if self.verbosity >= 7: verbosityDepth("DEL", "Done solving svd.", timestamp = self.timestamp) return ev, eV def centerNormalize(self, mu : paramList = [], mu0 : paramVal = None) -> paramList: """ Compute normalized parameter to be plugged into approximant. Args: mu: Parameter(s) 1. mu0: Parameter(s) 2. If None, set to self.mu0. Returns: Normalized parameter. """ return self.trainedModel.centerNormalize(mu, mu0) def getResidues(self) -> Np1D: """ Obtain approximant residues. Returns: Matrix with residues as columns. """ return self.trainedModel.getResidues() diff --git a/rrompy/reduction_methods/distributed_greedy/rational_interpolant_greedy.py b/rrompy/reduction_methods/distributed_greedy/rational_interpolant_greedy.py index 9e27318..0ec150b 100644 --- a/rrompy/reduction_methods/distributed_greedy/rational_interpolant_greedy.py +++ b/rrompy/reduction_methods/distributed_greedy/rational_interpolant_greedy.py @@ -1,416 +1,416 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from copy import deepcopy as copy import numpy as np from .generic_distributed_greedy_approximant import \ GenericDistributedGreedyApproximant from rrompy.utilities.poly_fitting.polynomial import polybases, polydomcoeff from rrompy.reduction_methods.distributed import RationalInterpolant from rrompy.reduction_methods.trained_model import TrainedModelPade as tModel from rrompy.reduction_methods.trained_model import TrainedModelData from rrompy.utilities.base.types import Np1D, Np2D, DictAny, HFEng, paramVal from rrompy.utilities.base import verbosityDepth from rrompy.utilities.exception_manager import RROMPyWarning from rrompy.utilities.exception_manager import RROMPyException, RROMPyAssert __all__ = ['RationalInterpolantGreedy'] class RationalInterpolantGreedy(GenericDistributedGreedyApproximant, RationalInterpolant): """ ROM greedy rational interpolant computation for parametric problems. Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': whether to compute POD of snapshots; defaults to True; - 'S': number of starting training points; - 'sampler': sample point generator; - 'greedyTol': uniform error tolerance for greedy algorithm; defaults to 1e-2; - 'interactive': whether to interactively terminate greedy algorithm; defaults to False; - 'maxIter': maximum number of greedy steps; defaults to 1e2; - 'refinementRatio': ratio of training points to be exhausted before training set refinement; defaults to 0.2; - 'nTestPoints': number of test points; defaults to 5e2; - 'trainSetGenerator': training sample points generator; defaults to Chebyshev sampler within muBounds; - 'polybasis': type of basis for interpolation; allowed values include 'MONOMIAL', 'CHEBYSHEV' and 'LEGENDRE'; defaults to 'MONOMIAL'; - 'Delta': difference between M and N in rational approximant; defaults to 0; - 'errorEstimatorKind': kind of error estimator; available values include 'EXACT', 'BASIC', and 'BARE'; defaults to 'EXACT'; - 'interpRcond': tolerance for interpolation; defaults to None; - 'robustTol': tolerance for robust Pade' denominator management; defaults to 0. Defaults to empty dict. homogeneized(optional): Whether to homogeneize Dirichlet BCs. Defaults to False. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. mu0: Default parameter. mus: Array of snapshot parameters. homogeneized: Whether to homogeneize Dirichlet BCs. approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList. parameterListSoft: Recognized keys of soft approximant parameters: - 'POD': whether to compute POD of snapshots. - 'greedyTol': uniform error tolerance for greedy algorithm; - 'interactive': whether to interactively terminate greedy algorithm; - 'maxIter': maximum number of greedy steps; - 'refinementRatio': ratio of test points to be exhausted before test set refinement; - 'nTestPoints': number of test points; - 'trainSetGenerator': training sample points generator; - 'Delta': difference between M and N in rational approximant; - 'errorEstimatorKind': kind of error estimator; - 'interpRcond': tolerance for interpolation; - 'robustTol': tolerance for robust Pade' denominator management. parameterListCritical: Recognized keys of critical approximant parameters: - 'S': total number of samples current approximant relies upon; - 'sampler': sample point generator. POD: whether to compute POD of snapshots. S: number of test points. sampler: Sample point generator. greedyTol: uniform error tolerance for greedy algorithm. interactive: whether to interactively terminate greedy algorithm. maxIter: maximum number of greedy steps. refinementRatio: ratio of training points to be exhausted before training set refinement. nTestPoints: number of starting training points. trainSetGenerator: training sample points generator. robustTol: tolerance for robust Pade' denominator management. Delta: difference between M and N in rational approximant. errorEstimatorKind: kind of error estimator. interpRcond: tolerance for interpolation. robustTol: tolerance for robust Pade' denominator management. muBounds: list of bounds for parameter values. samplingEngine: Sampling engine. estimatorNormEngine: Engine for estimator norm computation. uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as sampleList. lastSolvedHF: Parameter(s) corresponding to last computed high fidelity solution(s) as parameterList. uAppReduced: Reduced approximate solution(s) with parameter(s) lastSolvedApp as sampleList. lastSolvedAppReduced: Parameter(s) corresponding to last computed reduced approximate solution(s) as parameterList. uApp: Approximate solution(s) with parameter(s) lastSolvedApp as sampleList. lastSolvedApp: Parameter(s) corresponding to last computed approximate solution(s) as parameterList. """ _allowedEstimatorKinds = ["EXACT", "BASIC", "BARE"] def __init__(self, HFEngine:HFEng, mu0 : paramVal = None, approxParameters : DictAny = {}, homogeneized : bool = False, verbosity : int = 10, timestamp : bool = True): self._preInit() self._addParametersToList(["Delta", "polybasis", "errorEstimatorKind", "interpRcond", "robustTol"], - [0, "MONOMIAL", "EXACT", -1, 0]) + [0, "MONOMIAL", "EXACT", -1, 0], + toBeExcluded = ["E"]) super().__init__(HFEngine = HFEngine, mu0 = mu0, approxParameters = approxParameters, homogeneized = homogeneized, verbosity = verbosity, timestamp = timestamp) if self.verbosity >= 7: verbosityDepth("INIT", "Computing Taylor blocks of system.", timestamp = self.timestamp) nAs = self.HFEngine.nAs - 1 nbs = max(self.HFEngine.nbs, (nAs + 1) * self.homogeneized) self.As = [self.HFEngine.A(self.mu0, j + 1) for j in range(nAs)] self.bs = [self.HFEngine.b(self.mu0, j, self.homogeneized) for j in range(nbs)] if self.verbosity >= 7: verbosityDepth("DEL", "Done computing Taylor blocks.", timestamp = self.timestamp) self._postInit() @property def polybasis(self): """Value of polybasis.""" return self._polybasis @polybasis.setter def polybasis(self, polybasis): try: polybasis = polybasis.upper().strip().replace(" ","") if polybasis not in polybases: raise RROMPyException("Sample type not recognized.") self._polybasis = polybasis except: RROMPyWarning(("Prescribed polybasis not recognized. Overriding " "to 'MONOMIAL'.")) self._polybasis = "MONOMIAL" self._approxParameters["polybasis"] = self.polybasis @property def Delta(self): """Value of Delta.""" return self._Delta @Delta.setter def Delta(self, Delta): if not np.isclose(Delta, np.floor(Delta)): raise RROMPyException("Delta must be an integer.") if Delta < 0: RROMPyWarning(("Error estimator unreliable for Delta < 0. " "Overloading of errorEstimator is suggested.")) else: Deltamin = (max(self.HFEngine.nbs, self.HFEngine.nAs * self.homogeneized) - 1 - 1 * (self.HFEngine.nAs > 1)) if Delta < Deltamin: RROMPyWarning(("Method may be unreliable for selected Delta. " "Suggested minimal value of Delta: {}.").format( Deltamin)) self._Delta = Delta self._approxParameters["Delta"] = self.Delta @property def errorEstimatorKind(self): """Value of errorEstimatorKind.""" return self._errorEstimatorKind @errorEstimatorKind.setter def errorEstimatorKind(self, errorEstimatorKind): errorEstimatorKind = errorEstimatorKind.upper() if errorEstimatorKind not in self._allowedEstimatorKinds: RROMPyWarning(("Error estimator kind not recognized. Overriding " "to 'EXACT'.")) errorEstimatorKind = "EXACT" self._errorEstimatorKind = errorEstimatorKind self._approxParameters["errorEstimatorKind"] = self.errorEstimatorKind @property def nTestPoints(self): """Value of nTestPoints.""" return self._nTestPoints @nTestPoints.setter def nTestPoints(self, nTestPoints): if nTestPoints <= np.abs(self.Delta): RROMPyWarning(("nTestPoints must be at least abs(Delta) + 1. " "Increasing value to abs(Delta) + 1.")) nTestPoints = np.abs(self.Delta) + 1 if not np.isclose(nTestPoints, np.int(nTestPoints)): raise RROMPyException("nTestPoints must be an integer.") nTestPoints = np.int(nTestPoints) if hasattr(self, "_nTestPoints") and self.nTestPoints is not None: nTestPointsold = self.nTestPoints else: nTestPointsold = -1 self._nTestPoints = nTestPoints self._approxParameters["nTestPoints"] = self.nTestPoints if nTestPointsold != self.nTestPoints: self.resetSamples() def _errorSamplingRatio(self, mus:Np1D, muRTest:Np1D, muRTrain:Np1D) -> Np1D: """Scalar ratio in explicit error estimator.""" self.setupApprox() testTile = np.tile(np.reshape(muRTest, (-1, 1)), [1, len(muRTrain)]) nodalVals = np.prod(testTile - np.reshape(muRTrain, (1, -1)), axis = 1) denVals = self.trainedModel.getQVal(mus) return np.abs(nodalVals / denVals) def _RHSNorms(self, radiusb0:Np2D) -> Np1D: """High fidelity system RHS norms.""" self.assembleReducedResidualBlocks(full = False) # 'ij,jk,ik->k', resbb, radiusb0, radiusb0.conj() b0resb0 = np.sum(self.trainedModel.data.resbb.dot(radiusb0) * radiusb0.conj(), axis = 0) RHSnorms = np.power(np.abs(b0resb0), .5) return RHSnorms def _errorEstimatorBare(self) -> Np1D: """Bare residual-based error estimator.""" self.setupApprox() self.assembleReducedResidualGramian(self.trainedModel.data.projMat) pDom = self.trainedModel.data.P[:, -1] LL = pDom.conj().dot(self.trainedModel.data.gramian.dot(pDom)) Adiag = self.As[0].diagonal() LL = ((self.scaleFactor[0] * np.linalg.norm(Adiag)) ** 2. * LL / np.size(Adiag)) scalingDom = polydomcoeff(len(self.mus) - 1, self.polybasis) return scalingDom * np.power(np.abs(LL), .5) def _errorEstimatorBasic(self, muTest:paramVal, ratioTest:complex) -> Np1D: """Basic residual-based error estimator.""" resmu = self.HFEngine.residual(self.trainedModel.getApprox(muTest), muTest, self.homogeneized)[0] return np.abs(self.estimatorNormEngine.norm(resmu) / ratioTest) def _errorEstimatorExact(self, muRTrain:Np1D, vanderBase:Np2D) -> Np1D: """Exact residual-based error estimator.""" self.setupApprox() self.assembleReducedResidualBlocks(full = True) nAs = self.HFEngine.nAs - 1 nbs = max(self.HFEngine.nbs - 1, nAs * self.homogeneized) delta = len(self.mus) - len(self.trainedModel.data.Q) nbsEff = max(0, nbs - delta) momentQ = np.zeros(nbsEff, dtype = np.complex) momentQu = np.zeros((len(self.mus), nAs), dtype = np.complex) radiusbTen = np.zeros((nbsEff, nbsEff, vanderBase.shape[1]), dtype = np.complex) radiusATen = np.zeros((nAs, nAs, vanderBase.shape[1]), dtype = np.complex) if nbsEff > 0: momentQ[0] = self.trainedModel.data.Q[-1] radiusbTen[0, :, :] = vanderBase[: nbsEff, :] momentQu[:, 0] = self.trainedModel.data.P[:, -1] radiusATen[0, :, :] = vanderBase[: nAs, :] Qvals = self.trainedModel.getQVal(self.mus) for k in range(1, max(nAs, nbs * (nbsEff > 0))): Qvals = Qvals * muRTrain if k > delta and k < nbs: momentQ[k - delta] = self._fitinv.dot(Qvals) radiusbTen[k - delta, k :, :] = ( radiusbTen[0, : delta - k, :]) if k < nAs: momentQu[:, k] = Qvals * self._fitinv radiusATen[k, k :, :] = radiusATen[0, : - k, :] if self.POD and nAs > 1: momentQu[:, 1 :] = self.samplingEngine.RPOD.dot( momentQu[:, 1 :]) radiusA = np.tensordot(momentQu, radiusATen, 1) if nbsEff > 0: radiusb = np.tensordot(momentQ, radiusbTen, 1) # 'ij,jk,ik->k', resbb, radiusb, radiusb.conj() ff = np.sum(self.trainedModel.data.resbb[delta + 1 :, delta + 1 :]\ .dot(radiusb) * radiusb.conj(), axis = 0) # 'ijk,jkl,il->l', resAb, radiusA, radiusb.conj() Lf = np.sum(np.tensordot( self.trainedModel.data.resAb[delta :, :, :], radiusA, 2) * radiusb.conj(), axis = 0) else: ff, Lf = 0., 0. # 'ijkl,klt,ijt->t', resAA, radiusA, radiusA.conj() LL = np.sum(np.tensordot(self.trainedModel.data.resAA, radiusA, 2) * radiusA.conj(), axis = (0, 1)) scalingDom = polydomcoeff(len(self.mus) - 1, self.polybasis) return scalingDom * np.power(np.abs(ff - 2. * np.real(Lf) + LL), .5) def errorEstimator(self, mus:Np1D) -> Np1D: """Standard residual-based error estimator.""" self.setupApprox() if (np.any(np.isnan(self.trainedModel.data.P[:, -1])) or np.any(np.isinf(self.trainedModel.data.P[:, -1]))): err = np.empty(len(mus)) err[:] = np.inf return err nAs = self.HFEngine.nAs - 1 nbs = max(self.HFEngine.nbs - 1, nAs * self.homogeneized) muRTest = self.centerNormalize(mus)(0) muRTrain = self.centerNormalize(self.mus)(0) samplingRatio = self._errorSamplingRatio(mus, muRTest, muRTrain) vanderBase = np.polynomial.polynomial.polyvander(muRTest, max(nAs, nbs)).T RHSnorms = self._RHSNorms(vanderBase[: nbs + 1, :]) if self.errorEstimatorKind == "BARE": jOpt = self._errorEstimatorBare() elif self.errorEstimatorKind == "BASIC": idx_muTestSample = np.argmax(samplingRatio) jOpt = self._errorEstimatorBasic(mus[idx_muTestSample], samplingRatio[idx_muTestSample]) else: #if self.errorEstimatorKind == "EXACT": jOpt = self._errorEstimatorExact(muRTrain, vanderBase[: -1, :]) return jOpt * samplingRatio / RHSnorms def setupApprox(self, plotEst : bool = False): """ Compute Pade' interpolant. SVD-based robust eigenvalue management. """ if self.checkComputedApprox(): return RROMPyAssert(self._mode, message = "Cannot setup approximant.") if self.verbosity >= 5: verbosityDepth("INIT", "Setting up {}.". format(self.name()), timestamp = self.timestamp) self.computeScaleFactor() self.greedy(plotEst) - self._N = len(self.mus) - 1 - self._M = copy(self._N) - self.E = copy(self._N) + self._S = len(self.mus) + self._N, self._M, self._E = [self._S - 1] * 3 if self.Delta < 0: self._M += self.Delta else: self._N -= self.Delta if self.trainedModel is None: self.trainedModel = tModel() self.trainedModel.verbosity = self.verbosity self.trainedModel.timestamp = self.timestamp data = TrainedModelData(self.trainedModel.name(), self.mu0, self.samplingEngine.samples, self.HFEngine.rescalingExp) data.polytype = self.polybasis data.scaleFactor = self.scaleFactor data.mus = copy(self.mus) self.trainedModel.data = data else: self.trainedModel = self.trainedModel self.trainedModel.data.projMat = copy(self.samplingEngine.samples) self.trainedModel.data.mus = copy(self.mus) if min(self.M, self.N) < 0: if self.verbosity >= 5: verbosityDepth("MAIN", "Minimal sample size not achieved.", timestamp = self.timestamp) Q = np.empty(max(self.N, 0) + 1, dtype = np.complex) P = np.empty((len(self.mus), max(self.M, 0) + 1), dtype = np.complex) Q[:] = np.nan P[:] = np.nan self.trainedModel.data.Q = copy(Q) self.trainedModel.data.P = copy(P) self.trainedModel.data.approxParameters = copy( self.approxParameters) if self.verbosity >= 5: verbosityDepth("DEL", "Aborting computation of approximant.", timestamp = self.timestamp) return if self.N > 0: Q = self._setupDenominator() else: Q = np.ones(1, dtype = np.complex) self.trainedModel.data.Q = copy(Q) P = self._setupNumerator() if self.POD: P = np.tensordot(self.samplingEngine.RPOD, P, axes = ([-1], [0])) self.trainedModel.data.P = copy(P) self.trainedModel.data.approxParameters = copy(self.approxParameters) if self.verbosity >= 5: verbosityDepth("DEL", "Done setting up approximant.", timestamp = self.timestamp) def assembleReducedResidualBlocks(self, full : bool = False): """Build affine blocks of reduced linear system through projections.""" scaling = self.trainedModel.data.scaleFactor[0] self.assembleReducedResidualBlocksbb(self.bs, scaling) if full: pMat = self.trainedModel.data.projMat self.assembleReducedResidualBlocksAb(self.As, self.bs[1 :], pMat, scaling) self.assembleReducedResidualBlocksAA(self.As, pMat, scaling) diff --git a/rrompy/utilities/base/__init__.py b/rrompy/utilities/base/__init__.py index c6c8528..6bcf126 100644 --- a/rrompy/utilities/base/__init__.py +++ b/rrompy/utilities/base/__init__.py @@ -1,53 +1,55 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from .find_dict_str_key import findDictStrKey from .get_new_filename import getNewFilename from .kroneckerer import kroneckerer from .factorials import multibinom, multifactorial from .pickle_utilities import pickleDump, pickleLoad from .purge_dict import purgeDict from .purge_list import purgeList from .number_theory import (squareResonances, primeFactorize, getLowestPrimeFactor) +from .halton import haltonGenerate from .sobol import sobolGenerate from .low_discrepancy import vanderCorput, lowDiscrepancy from . import types as Types from .verbosity_depth import verbosityDepth __all__ = [ 'findDictStrKey', 'getNewFilename', 'kroneckerer', 'multibinom', 'multifactorial', 'pickleDump', 'pickleLoad', 'purgeDict', 'purgeList', 'squareResonances', 'primeFactorize', 'getLowestPrimeFactor', + 'haltonGenerate', 'sobolGenerate', 'vanderCorput', 'lowDiscrepancy', 'Types', 'verbosityDepth' ] diff --git a/rrompy/utilities/base/halton.py b/rrompy/utilities/base/halton.py new file mode 100644 index 0000000..fe1e24c --- /dev/null +++ b/rrompy/utilities/base/halton.py @@ -0,0 +1,52 @@ +# Copyright (C) 2018 by the RROMPy authors +# +# This file is part of RROMPy. +# +# RROMPy is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# RROMPy is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with RROMPy. If not, see . +# + +import numpy as np + +__all__ = ['haltonGenerate'] + +def next_prime(): + def is_prime(num): + "Checks if num is a prime value" + for i in range(2,int(num**0.5)+1): + if(num % i)==0: return False + return True + prime = 3 + while(1): + if is_prime(prime): + yield prime + prime += 2 + +def vdc(n, base=2): + vdc, denom = 0, 1 + while n: + denom *= base + n, remainder = divmod(n, base) + vdc += remainder/float(denom) + return vdc + +def haltonGenerate(dim, size, seed = 0, step = 2): + seq = np.empty((size, dim), dtype = float) + primeGen = next_prime() + next(primeGen) + for d in range(dim): + base = next(primeGen) + for i in range(seed, step * size + seed, step): + seq[(i - seed) // step, d] = vdc(i, base) + return seq + diff --git a/rrompy/utilities/base/low_discrepancy.py b/rrompy/utilities/base/low_discrepancy.py index 47f83ca..74ce9d7 100644 --- a/rrompy/utilities/base/low_discrepancy.py +++ b/rrompy/utilities/base/low_discrepancy.py @@ -1,46 +1,47 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import numpy as np from rrompy.utilities.base.types import Np1D from rrompy.utilities.exception_manager import RROMPyException __all__ = ['vanderCorput', 'lowDiscrepancy'] def vanderCorput(n:int) -> Np1D: if n <= 0: raise RROMPyException("Only positive integers allowed.") x = [0] * n ln = int(np.ceil(np.log2(n))) for j in range(n): x[j] = int(np.binary_repr(j, width = ln)[::-1], 2) return x -def lowDiscrepancy(n:int) -> Np1D: +def lowDiscrepancy(n:int, inverse : bool = False) -> Np1D: if n <= 0: raise RROMPyException("Only positive integers allowed.") x = [0] * n max2Pow = np.binary_repr(n)[::-1].find('1') max2Fac = 2 ** max2Pow res2 = n // max2Fac xBase = res2 * np.array(vanderCorput(max2Fac), dtype = np.int) stride = ((n - 1) // 2 - 1) * (max2Pow == 0) + 1 for i in range(res2): x[i * max2Fac : (i + 1) * max2Fac] = (i * stride + xBase) % n + if inverse: x = np.argsort(x) return x diff --git a/rrompy/utilities/poly_fitting/polynomial/homogeneization.py b/rrompy/utilities/poly_fitting/polynomial/homogeneization.py index bbcee6e..f42d5e1 100644 --- a/rrompy/utilities/poly_fitting/polynomial/homogeneization.py +++ b/rrompy/utilities/poly_fitting/polynomial/homogeneization.py @@ -1,55 +1,55 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import numpy as np from rrompy.utilities.base.types import Np1D, Np2D, Tuple, List, paramList -from rrompy.utilities.poly_fitting.polynomial import (polyvander, - nextDerivativeIndices) +from rrompy.utilities.poly_fitting.polynomial import polyvander from rrompy.parameter import checkParameterList __all__ = ['homogeneizationMask', 'homogeneizedpolyvander'] def homogeneizationMask(degs:List[int]) -> Tuple[List[int], List[bool]]: dim = len(degs) N = np.prod([d + 1 for d in degs]) maskN = np.arange(N, dtype = int) remN = np.zeros((N, dim), dtype = int) for j in range(dim - 1, -1, -1): remN[:, j] = maskN % (degs[j] + 1) maskN //= degs[j] + 1 mask = np.sum(remN, axis = 1) <= min(degs) return remN[mask], mask def homogeneizedpolyvander(x:paramList, deg:int, basis:str, derIdxs : List[List[List[int]]] = None, reorder : List[int] = None, scl : Np1D = None)\ -> Tuple[Np2D, List[List[int]], List[int]]: x, _ = checkParameterList(x) degs = [deg] * x.shape[1] VanBase = polyvander(x, degs, basis, derIdxs, reorder, scl) derIdxs, mask = homogeneizationMask(degs) - # FIXME: improve computation of ordering indices + ordIdxs = np.empty(len(derIdxs), dtype = int) - derOrdered = nextDerivativeIndices([], x.shape[1], len(derIdxs)) - for j, derOrd in enumerate(derOrdered): - for k, dIdx in enumerate(derIdxs): - if np.allclose(dIdx, derOrd): - ordIdxs[j] = k - break + derTotal = np.array([np.sum(y) for y in derIdxs]) + idxPrev = 0 + rangeAux = np.arange(len(derIdxs)) + for j in range(deg + 1): + idxLocal = rangeAux[derTotal == j][::-1] + idxPrev += len(idxLocal) + ordIdxs[idxPrev - len(idxLocal) : idxPrev] = idxLocal return VanBase[:, mask], derIdxs, ordIdxs diff --git a/tests/test_3_reduction_methods_1D/rational_pade_1d.py b/tests/test_3_reduction_methods_1D/rational_pade_1d.py index 79e189f..99e9aaf 100644 --- a/tests/test_3_reduction_methods_1D/rational_pade_1d.py +++ b/tests/test_3_reduction_methods_1D/rational_pade_1d.py @@ -1,86 +1,86 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import os import numpy as np from matrix_fft import matrixFFT from rrompy.reduction_methods.centered import RationalPade as RP def test_rho(): mu = 1.5 mu0 = 2. + 1.j solver = matrixFFT() uh = solver.solve(mu)[0] params = {"POD": False, "M": 9, "N": 10, "S": 11, "robustTol": 1e-6} approx = RP(solver, mu0, params, verbosity = 0) approx.setupApprox() if not os.path.isdir("./.pytest_cache"): os.mkdir("./.pytest_cache") filesOut = [x for x in os.listdir("./.pytest_cache") if (x[-4:] == ".pkl" and x[:6] == "outPad")] for fileOut in filesOut: os.remove("./.pytest_cache/" + fileOut) fileStored = approx.storeTrainedModel(".pytest_cache/outPad") filesOut = [x for x in os.listdir("./.pytest_cache") if (x[-4:] == ".pkl" and x[:6] == "outPad")] assert len(filesOut) == 1 assert filesOut[0] == fileStored[- len(filesOut[0]) :] uhP1 = approx.getApprox(mu)[0] errP = approx.getErr(mu)[0] errNP = approx.normErr(mu)[0] myerrP = uhP1 - uh assert np.allclose(np.abs(errP - myerrP), 0., rtol = 1e-3) assert np.isclose(solver.norm(errP), errNP, rtol = 1e-3) resP = approx.getRes(mu)[0] resNP = approx.normRes(mu) assert np.isclose(solver.norm(resP), resNP, rtol = 1e-3) assert np.allclose(np.abs(resP - (solver.b(mu) - solver.A(mu).dot(uhP1))), 0., rtol = 1e-3) del approx approx = RP(solver, mu0, {"S": 3}, verbosity = 0) approx.loadTrainedModel(fileStored) for fileOut in filesOut: os.remove("./.pytest_cache/" + fileOut) uhP2 = approx.getApprox(mu)[0] assert np.allclose(np.abs(uhP1 - uhP2), 0., rtol = 1e-3) def test_E_warn(capsys): mu = 1.5 mu0 = 2. + 1.j solver = matrixFFT() uh = solver.solve(mu)[0] params = {"POD": True, "M": 14, "N": 15, "S": 10} approx = RP(solver, mu0, params, verbosity = 0) approx.setupApprox() out, err = capsys.readouterr() - assert "Prescribed S is too small. Decreasing M." in out - assert "Prescribed S is too small. Decreasing N." in out + assert "Prescribed E is too small. Decreasing M." in out + assert "Prescribed E is too small. Decreasing N." in out assert len(err) == 0 uhP = approx.getApprox(mu)[0] errP = approx.getErr(mu)[0] errNP = approx.normErr(mu)[0] assert np.allclose(np.abs(errP - (uhP - uh)), 0., rtol = 1e-3) assert np.isclose(errNP, 3.5197568e-07, rtol = 1e-1) poles, ress = approx.getResidues() condres = np.linalg.cond(solver.innerProduct(ress, ress)) assert np.isclose(condres, 192.1791778, rtol = 1e-3) assert np.isclose(np.min(np.abs(poles - 2.)), 0., atol = 1e-3) assert np.isclose(np.min(np.abs(poles - 1.)), 0., atol = 1e-2) assert np.isclose(np.min(np.abs(poles - 3.)), 0., atol = 1e-2) diff --git a/tests/test_4_reduction_methods_multiD/rational_interpolant_2d.py b/tests/test_4_reduction_methods_multiD/rational_interpolant_2d.py index ed14275..4aef1c4 100644 --- a/tests/test_4_reduction_methods_multiD/rational_interpolant_2d.py +++ b/tests/test_4_reduction_methods_multiD/rational_interpolant_2d.py @@ -1,77 +1,77 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import numpy as np from matrix_random import matrixRandom from rrompy.reduction_methods.distributed import RationalInterpolant as RI from rrompy.parameter.parameter_sampling import (QuadratureSampler as QS, ManualSampler as MS) def test_monomials(capsys): mu = [5.05, 7.1] mu0 = [5., 7.] solver = matrixRandom() uh = solver.solve(mu)[0] params = {"POD": False, "M": 4, "N": 4, "S": [4, 4], "robustTol": 1e-6, "interpRcond": 1e-3, "polybasis": "MONOMIAL", "sampler": QS([[4.9, 6.85], [5.1, 7.15]], "UNIFORM")} approx = RI(solver, mu0, params, verbosity = 0) approx.setupApprox() uhP1 = approx.getApprox(mu)[0] errP = approx.getErr(mu)[0] errNP = approx.normErr(mu)[0] myerrP = uhP1 - uh assert np.allclose(np.abs(errP - myerrP), 0., rtol = 1e-3) assert np.isclose(solver.norm(errP), errNP, rtol = 1e-3) resP = approx.getRes(mu)[0] resNP = approx.normRes(mu) assert np.isclose(solver.norm(resP), resNP, rtol = 1e-3) assert np.allclose(np.abs(resP - (solver.b(mu) - solver.A(mu).dot(uhP1))), 0., rtol = 1e-3) assert np.isclose(errNP / solver.norm(uh), 7.537e-4, rtol = 1e-1) out, err = capsys.readouterr() assert ("poorly conditioned. Reducing M" in out) assert len(err) == 0 def test_well_cond(): mu = [5.05, 7.1] mu0 = [5., 7.] solver = matrixRandom() params = {"POD": True, "M": 3, "N": 3, "S": [4, 4], "interpRcond": 1e-10, "polybasis": "CHEBYSHEV", "sampler": QS([[4.9, 6.85], [5.1, 7.15]], "UNIFORM")} approx = RI(solver, mu0, params, verbosity = 0) approx.setupApprox() print(approx.normErr(mu)[0] / approx.normHF(mu)[0]) assert np.isclose(approx.normErr(mu)[0] / approx.normHF(mu)[0], 8.46624e-3, rtol = 1e-1) def test_hermite(): mu = [5.05, 7.1] mu0 = [5., 7.] solver = matrixRandom() sampler0 = QS([[4.9, 6.85], [5.1, 7.15]], "UNIFORM") params = {"POD": True, "M": 3, "N": 3, "S": [25], "polybasis": "CHEBYSHEV", "sampler": MS([[4.9, 6.85], [5.1, 7.15]], points = sampler0.generatePoints([3, 3]))} approx = RI(solver, mu0, params, verbosity = 0) approx.setupApprox() assert np.isclose(approx.normErr(mu)[0] / approx.normHF(mu)[0], - 1.449341e-4, rtol = 1e-1) + 2.36598e-4, rtol = 1e-1) diff --git a/tests/test_4_reduction_methods_multiD/rational_pade_2d.py b/tests/test_4_reduction_methods_multiD/rational_pade_2d.py index 1db8384..01ea5b7 100644 --- a/tests/test_4_reduction_methods_multiD/rational_pade_2d.py +++ b/tests/test_4_reduction_methods_multiD/rational_pade_2d.py @@ -1,64 +1,64 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import numpy as np from matrix_random import matrixRandom from rrompy.reduction_methods.centered import RationalPade as RP def test_rho(): mu = [5.5, 7.5] mu0 = [5., 7.] solver = matrixRandom() uh = solver.solve(mu)[0] params = {"POD": False, "M": 3, "N": 3, "S": 10, "robustTol": 1e-6} approx = RP(solver, mu0, params, verbosity = 0) approx.setupApprox() uhP1 = approx.getApprox(mu)[0] errP = approx.getErr(mu)[0] errNP = approx.normErr(mu)[0] myerrP = uhP1 - uh assert np.allclose(np.abs(errP - myerrP), 0., rtol = 1e-3) assert np.isclose(solver.norm(errP), errNP, rtol = 1e-3) resP = approx.getRes(mu)[0] resNP = approx.normRes(mu) assert np.isclose(solver.norm(resP), resNP, rtol = 1e-3) assert np.allclose(np.abs(resP - (solver.b(mu) - solver.A(mu).dot(uhP1))), 0., rtol = 1e-3) def test_E_warn(capsys): mu = [5.5, 7.5] mu0 = [5., 7.] solver = matrixRandom() uh = solver.solve(mu)[0] params = {"POD": True, "M": 3, "N": 4, "S": 10} approx = RP(solver, mu0, params, verbosity = 0) approx.setupApprox() out, err = capsys.readouterr() - assert "Prescribed S is too small. Decreasing M" not in out - assert "Prescribed S is too small. Decreasing N" in out + assert "Prescribed E is too small. Decreasing M" not in out + assert "Prescribed E is too small. Decreasing N" in out assert len(err) == 0 uhP = approx.getApprox(mu)[0] uhNP = approx.normApprox(mu)[0] errP = approx.getErr(mu)[0] errNP = approx.normErr(mu)[0] assert np.allclose(np.abs(errP - (uhP - uh)), 0., rtol = 1e-3) assert np.isclose(errNP / uhNP, 1.41336e-2, rtol = 1e-1)