diff --git a/rrompy/reduction_methods/base/trained_model/trained_model.py b/rrompy/reduction_methods/base/trained_model/trained_model.py index bc5b784..832e37a 100644 --- a/rrompy/reduction_methods/base/trained_model/trained_model.py +++ b/rrompy/reduction_methods/base/trained_model/trained_model.py @@ -1,94 +1,95 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from abc import abstractmethod from rrompy.utilities.base.types import Np1D, paramList, sampList from rrompy.parameter import checkParameterList from rrompy.sampling import emptySampleList __all__ = ['TrainedModel'] class TrainedModel: """ ABSTRACT ROM approximant evaluation. Attributes: Data: dictionary with all that can be pickled. """ def name(self) -> str: return self.__class__.__name__ def __str__(self) -> str: return self.name() def __repr__(self) -> str: return self.__str__() + " at " + hex(id(self)) def reset(self): self.lastSolvedApproxReduced = None self.lastSolvedApprox = None @property def npar(self): """Number of parameters.""" return self.data.mu0.shape[1] @abstractmethod def getApproxReduced(self, mu : paramList = []) -> sampList: """ Evaluate reduced representation of approximant at arbitrary parameter. (ABSTRACT) Args: mu: Target parameter. """ pass def getApprox(self, mu : paramList = []) -> sampList: """ Evaluate approximant at arbitrary parameter. Args: mu: Target parameter. """ mu = checkParameterList(mu, self.data.npar)[0] if (not hasattr(self, "lastSolvedApprox") or self.lastSolvedApprox != mu): uApproxR = self.getApproxReduced(mu) self.uApprox = emptySampleList() for i in range(len(mu)): - uApp = self.data.projMat.dot(uApproxR[i]) + uApp = self.data.projMat[:, : uApproxR.shape[0]].dot( + uApproxR[i]) if i == 0: self.uApprox.reset((len(uApp), len(mu)), dtype = uApp.dtype) self.uApprox[i] = uApp self.lastSolvedApprox = mu return self.uApprox @abstractmethod def getPoles(self) -> Np1D: """ Obtain approximant poles. Returns: Numpy complex vector of poles. """ pass diff --git a/rrompy/reduction_methods/pivoted/greedy/generic_pivoted_greedy_approximant.py b/rrompy/reduction_methods/pivoted/greedy/generic_pivoted_greedy_approximant.py index e38bd15..51aa91e 100644 --- a/rrompy/reduction_methods/pivoted/greedy/generic_pivoted_greedy_approximant.py +++ b/rrompy/reduction_methods/pivoted/greedy/generic_pivoted_greedy_approximant.py @@ -1,459 +1,461 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from abc import abstractmethod from copy import deepcopy as copy import numpy as np from matplotlib import pyplot as plt from rrompy.reduction_methods.pivoted.generic_pivoted_approximant import ( GenericPivotedApproximant, PODGlobal) from rrompy.utilities.base.types import Np1D, Tuple, List, paramVal, paramList from rrompy.utilities.base import verbosityManager as vbMng from rrompy.utilities.numerical.point_matching import (pointMatching, chordalMetricAdjusted, potential) from rrompy.utilities.exception_manager import (RROMPyException, RROMPyAssert, RROMPyWarning) from rrompy.parameter import checkParameterList, emptyParameterList __all__ = ['GenericPivotedGreedyApproximant'] class GenericPivotedGreedyApproximant(GenericPivotedApproximant): """ ROM pivoted greedy interpolant computation for parametric problems (ABSTRACT). Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. directionPivot(optional): Pivot components. Defaults to [0]. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': whether to compute POD of snapshots; defaults to True; - 'matchingWeight': weight for pole matching optimization; defaults to 1; - 'cutOffTolerance': tolerance for ignoring parasitic poles; defaults to np.inf; - 'cutOffKind': kind of cut off strategy; available values include 'SOFT' and 'HARD'; defaults to 'HARD'; - 'matchingWeightError': weight for pole matching optimization in error estimation; defaults to 0; - 'cutOffToleranceError': tolerance for ignoring parasitic poles in error estimation; defaults to 'AUTO', i.e. cutOffTolerance; - 'S': total number of pivot samples current approximant relies upon; - 'samplerPivot': pivot sample point generator; - 'SMarginal': number of starting marginal samples; - 'samplerMarginalGrid': marginal sample point generator via sparse grid; - 'polybasisMarginal': type of polynomial basis for marginal interpolation; allowed values include 'MONOMIAL', 'CHEBYSHEV' and 'LEGENDRE'; defaults to 'MONOMIAL'; - 'MMarginal': degree of marginal interpolant; defaults to 'AUTO', i.e. maximum allowed; - 'greedyTolMarginal': uniform error tolerance for marginal greedy algorithm; defaults to 1e-1; - 'maxIterMarginal': maximum number of marginal greedy steps; defaults to 1e2; - 'polydegreetypeMarginal': type of polynomial degree for marginal; defaults to 'TOTAL'; - 'radialDirectionalWeightsMarginal': radial basis weights for marginal interpolant; defaults to 1; - 'nNearestNeighborMarginal': number of marginal nearest neighbors considered if polybasisMarginal allows; defaults to -1; - 'interpRcondMarginal': tolerance for marginal interpolation; defaults to None. Defaults to empty dict. approx_state(optional): Whether to approximate state. Defaults to False. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. mu0: Default parameter. directionPivot: Pivot components. mus: Array of snapshot parameters. musMarginal: Array of marginal snapshot parameters. approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList. parameterListSoft: Recognized keys of soft approximant parameters: - 'POD': whether to compute POD of snapshots; - 'matchingWeight': weight for pole matching optimization; - 'cutOffTolerance': tolerance for ignoring parasitic poles; - 'cutOffKind': kind of cut off strategy; - 'matchingWeightError': weight for pole matching optimization in error estimation; - 'cutOffToleranceError': tolerance for ignoring parasitic poles in error estimation; - 'polybasisMarginal': type of polynomial basis for marginal interpolation; - 'MMarginal': degree of marginal interpolant; - 'greedyTolMarginal': uniform error tolerance for marginal greedy algorithm; - 'maxIterMarginal': maximum number of marginal greedy steps; - 'polydegreetypeMarginal': type of polynomial degree for marginal; - 'radialDirectionalWeightsMarginal': radial basis weights for marginal interpolant; - 'nNearestNeighborMarginal': number of marginal nearest neighbors considered if polybasisMarginal allows; - 'interpRcondMarginal': tolerance for marginal interpolation. parameterListCritical: Recognized keys of critical approximant parameters: - 'S': total number of pivot samples current approximant relies upon; - 'samplerPivot': pivot sample point generator; - 'SMarginal': total number of marginal samples current approximant relies upon; - 'samplerMarginalGrid': marginal sample point generator via sparse grid. approx_state: Whether to approximate state. verbosity: Verbosity level. POD: Whether to compute POD of snapshots. matchingWeight: Weight for pole matching optimization. cutOffTolerance: Tolerance for ignoring parasitic poles. cutOffKind: Kind of cut off strategy. matchingWeightError: Weight for pole matching optimization in error estimation. cutOffToleranceError: Tolerance for ignoring parasitic poles in error estimation. S: Total number of pivot samples current approximant relies upon. samplerPivot: Pivot sample point generator. SMarginal: Total number of marginal samples current approximant relies upon. samplerMarginalGrid: Marginal sample point generator via sparse grid. polybasisMarginal: Type of polynomial basis for marginal interpolation. MMarginal: Degree of marginal interpolant. greedyTolMarginal: Uniform error tolerance for marginal greedy algorithm. maxIterMarginal: Maximum number of marginal greedy steps. polydegreetypeMarginal: Type of polynomial degree for marginal. radialDirectionalWeightsMarginal: Radial basis weights for marginal interpolant. nNearestNeighborMarginal: Number of marginal nearest neighbors considered if polybasisMarginal allows. interpRcondMarginal: Tolerance for marginal interpolation. muBounds: list of bounds for pivot parameter values. muBoundsMarginal: list of bounds for marginal parameter values. samplingEngine: Sampling engine. uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as sampleList. lastSolvedHF: Parameter(s) corresponding to last computed high fidelity solution(s) as parameterList. uApproxReduced: Reduced approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApproxReduced: Parameter(s) corresponding to last computed reduced approximate solution(s) as parameterList. uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApprox: Parameter(s) corresponding to last computed approximate solution(s) as parameterList. """ def __init__(self, *args, **kwargs): self._preInit() from rrompy.parameter import localSparseGrid as SG SGBase = SG([[0.], [1.]], "UNIFORM") self._addParametersToList(["matchingWeightError", "cutOffToleranceError", "greedyTolMarginal", "maxIterMarginal"], [0., "AUTO", 1e-1, 1e2], ["samplerMarginalGrid"], [SGBase], toBeExcluded = ["samplerMarginal"]) super().__init__(*args, **kwargs) self._postInit() @property def muBoundsMarginal(self): """Value of muBoundsMarginal.""" return self.samplerMarginalGrid.lims @property def samplerMarginalGrid(self): """Value of samplerMarginalGrid.""" return self._samplerMarginalGrid @samplerMarginalGrid.setter def samplerMarginalGrid(self, samplerMarginalGrid): if 'refine' not in dir(samplerMarginalGrid): raise RROMPyException("Marginal sampler type not recognized.") if (hasattr(self, '_samplerMarginalGrid') and self._samplerMarginalGrid is not None): samplerOld = self.samplerMarginalGrid self._samplerMarginalGrid = samplerMarginalGrid self._approxParameters["samplerMarginalGrid"] = ( self.samplerMarginalGrid.__str__()) if (not 'samplerOld' in locals() or samplerOld != self.samplerMarginalGrid): self.resetSamples() @property def matchingWeightError(self): """Value of matchingWeightError.""" return self._matchingWeightError @matchingWeightError.setter def matchingWeightError(self, matchingWeightError): self._matchingWeightError = matchingWeightError self._approxParameters["matchingWeightError"] = ( self.matchingWeightError) @property def cutOffToleranceError(self): """Value of cutOffToleranceError.""" return self._cutOffToleranceError @cutOffToleranceError.setter def cutOffToleranceError(self, cutOffToleranceError): if isinstance(cutOffToleranceError, (str,)): cutOffToleranceError = cutOffToleranceError.upper()\ .strip().replace(" ","") if cutOffToleranceError != "AUTO": RROMPyWarning(("String value of cutOffToleranceError not " "recognized. Overriding to 'AUTO'.")) cutOffToleranceError == "AUTO" self._cutOffToleranceError = cutOffToleranceError self._approxParameters["cutOffToleranceError"] = ( self.cutOffToleranceError) @property def greedyTolMarginal(self): """Value of greedyTolMarginal.""" return self._greedyTolMarginal @greedyTolMarginal.setter def greedyTolMarginal(self, greedyTolMarginal): if greedyTolMarginal < 0: raise RROMPyException("greedyTolMarginal must be non-negative.") if (hasattr(self, "_greedyTolMarginal") and self.greedyTolMarginal is not None): greedyTolMarginalold = self.greedyTolMarginal else: greedyTolMarginalold = -1 self._greedyTolMarginal = greedyTolMarginal self._approxParameters["greedyTolMarginal"] = self.greedyTolMarginal if greedyTolMarginalold != self.greedyTolMarginal: self.resetSamples() @property def maxIterMarginal(self): """Value of maxIterMarginal.""" return self._maxIterMarginal @maxIterMarginal.setter def maxIterMarginal(self, maxIterMarginal): if maxIterMarginal <= 0: raise RROMPyException("maxIterMarginal must be positive.") if (hasattr(self, "_maxIterMarginal") and self.maxIterMarginal is not None): maxIterMarginalold = self.maxIterMarginal else: maxIterMarginalold = -1 self._maxIterMarginal = maxIterMarginal self._approxParameters["maxIterMarginal"] = self.maxIterMarginal if maxIterMarginalold != self.maxIterMarginal: self.resetSamples() def resetSamples(self): """Reset samples.""" super().resetSamples() if not hasattr(self, "_temporaryPivot"): self._mus = emptyParameterList() self.musMarginal = emptyParameterList() if hasattr(self, "samplerMarginalGrid"): self.samplerMarginalGrid.reset() if hasattr(self, "samplingEngine") and self.samplingEngine is not None: self.samplingEngine.resetHistory() def errorEstimatorMarginal(self, return_max : bool = False) -> Np1D: vbMng(self, "INIT", "Matching poles.", 10) self.trainedModel.initializeFromRational( self.HFEngine, self.matchingWeight, self.POD == PODGlobal, self.approx_state) vbMng(self, "DEL", "Done matching poles.", 10) self._finalizeMarginalization() _tMdataFull = copy(self.trainedModel.data) vbMng(self.trainedModel, "INIT", "Evaluating error estimator at mu = {}.".format( self.trainedModel.data.musMarginal), 10) err = np.zeros(len(self.trainedModel.data.musMarginal)) if len(err) <= 1: err[:] = np.inf else: if self.cutOffToleranceError == "AUTO": cutOffTolErr = self.cutOffTolerance else: cutOffTolErr = self.cutOffToleranceError if not hasattr(self, "_MMarginal_isauto"): if not hasattr(self, "_MMarginalOriginal"): self._MMarginalOriginal = self.MMarginal self.MMarginal = self._MMarginalOriginal _musMExcl = None self.verbosity -= 35 self.trainedModel.verbosity -= 35 foci = self.samplerPivot.normalFoci() ground = self.samplerPivot.groundPotential() for j in range(len(err)): jEff = j - (j > 0) muTest = self.trainedModel.data.musMarginal[jEff] polesEx = self.trainedModel.data.HIs[jEff].poles idxExEff = np.where(potential(polesEx, foci) - ground <= cutOffTolErr * ground)[0] polesEx = polesEx[idxExEff] if self.matchingWeightError != 0: resEx = self.trainedModel.data.HIs[jEff].coeffs[idxExEff] else: resEx = None if j > 0: self.musMarginal.insert(_musMExcl, j - 1) _musMExcl = self.musMarginal[j] self.musMarginal.pop(j) if len(polesEx) == 0: continue self.trainedModel.updateEffectiveSamples( self.HFEngine, [j], self.matchingWeight, self.POD == PODGlobal, self.approx_state) self._reduceDegreeNNoWarn = 1 self._finalizeMarginalization() polesAp = self.trainedModel.interpolateMarginalPoles(muTest)[ ..., 0] idxApEff = np.where(potential(polesAp, foci) - ground <= cutOffTolErr * ground)[0] polesAp = polesAp[idxApEff] if self.matchingWeightError != 0: resAp = self.trainedModel.interpolateMarginalCoeffs( muTest)[idxApEff, :, 0] if self.POD != PODGlobal: - resEx = self.trainedModel.data.projMat.dot(resEx.T) - resAp = self.trainedModel.data.projMat.dot(resAp.T) + resEx = self.trainedModel.data.projMat[:, + : resEx.shape[1]].dot(resEx.T) + resAp = self.trainedModel.data.projMat[:, + : resAp.shape[1]].dot(resAp.T) else: resAp = None dist = chordalMetricAdjusted( polesEx, polesAp, self.matchingWeightError, resEx, resAp, self.HFEngine, self.approx_state) pmR, pmC = pointMatching(dist) err[j] = np.mean(dist[pmR, pmC]) self.trainedModel.updateEffectiveSamples(self.HFEngine, None, self.matchingWeight, self.POD == PODGlobal, self.approx_state) if not hasattr(self, "_MMarginal_isauto"): self.MMarginal = self._MMarginalOriginal self.musMarginal.append(_musMExcl) self.verbosity += 35 self.trainedModel.verbosity += 35 self.trainedModel.data = _tMdataFull del self._reduceDegreeNNoWarn vbMng(self.trainedModel, "DEL", "Done evaluating error estimator", 10) if not return_max: return err idxMaxEst = np.where(err > self.greedyTolMarginal)[0] return err, idxMaxEst, err[idxMaxEst] def plotEstimatorMarginal(self, est:Np1D, idxMax:List[int], estMax:List[float]): if not (np.any(np.isnan(est)) or np.any(np.isinf(est))): fig = plt.figure(figsize = plt.figaspect(1. / self.nparMarginal)) for jpar in range(self.nparMarginal): ax = fig.add_subplot(1, self.nparMarginal, 1 + jpar) musre = copy(self.trainedModel.data.musMarginal.re.data) errCP = copy(est) idx = np.delete(np.arange(self.nparMarginal), jpar) while len(musre) > 0: if self.nparMarginal == 1: currIdx = np.arange(len(musre)) else: currIdx = np.where(np.isclose(np.sum( np.abs(musre[:, idx] - musre[0, idx]), 1), 0.))[0] currIdxSorted = currIdx[np.argsort(musre[currIdx, jpar])] ax.semilogy(musre[currIdxSorted, jpar], errCP[currIdxSorted], 'k.-', linewidth = 1) musre = np.delete(musre, currIdx, 0) errCP = np.delete(errCP, currIdx) ax.semilogy(self.musMarginal.re(jpar), (self.greedyTolMarginal,) * len(self.musMarginal), '*m') if len(idxMax) > 0 and estMax is not None: ax.semilogy(self.trainedModel.data.musMarginal.re( idxMax, jpar), estMax, 'xr') ax.grid() plt.tight_layout() plt.show() def _addMarginalSample(self, mus:paramList): mus = checkParameterList(mus, self.nparMarginal)[0] if len(mus) == 0: return nmus = len(mus) vbMng(self, "MAIN", ("Adding marginal sample point{} no. {}{} at {} to training " "set.").format("s" * (nmus > 1), len(self.musMarginal) + 1, "--{}".format(len(self.musMarginal) + nmus) * (nmus > 1), mus), 3) self.musMarginal.append(mus) self.setupApproxPivoted(mus) self._SMarginal = len(self.musMarginal) self._approxParameters["SMarginal"] = self.SMarginal def greedyNextSampleMarginal(self, muidx:int, plotEst : str = "NONE") \ -> Tuple[Np1D, int, float, paramVal]: RROMPyAssert(self._mode, message = "Cannot add greedy sample.") idxAdded = self.samplerMarginalGrid.refine(muidx) self._addMarginalSample(self.samplerMarginalGrid.points[idxAdded]) errorEstTest, muidx, maxErrorEst = self.errorEstimatorMarginal(True) if plotEst == "ALL": self.plotEstimatorMarginal(errorEstTest, muidx, maxErrorEst) return (errorEstTest, muidx, maxErrorEst, self.samplerMarginalGrid.points[muidx]) def _preliminaryTrainingMarginal(self): """Initialize starting snapshots of solution map.""" RROMPyAssert(self._mode, message = "Cannot start greedy algorithm.") if np.sum(self.samplingEngine.nsamples) > 0: return self.resetSamples() idx = [0] while self.samplerMarginalGrid.npoints < self.SMarginal: idx = self.samplerMarginalGrid.refine(idx) self._addMarginalSample(self.samplerMarginalGrid.points) @abstractmethod def setupApproxPivoted(self, mu:paramVal) -> int: if self.checkComputedApproxPivoted(): return -1 RROMPyAssert(self._mode, message = "Cannot setup approximant.") vbMng(self, "INIT", "Setting up pivoted approximant.", 10) pass vbMng(self, "DEL", "Done setting up pivoted approximant.", 10) return 0 def setupApprox(self, plotEst : str = "NONE") -> int: """Compute greedy snapshots of solution map.""" if self.checkComputedApprox(): return -1 RROMPyAssert(self._mode, message = "Cannot start greedy algorithm.") vbMng(self, "INIT", "Starting computation of snapshots.", 3) self._preliminaryTrainingMarginal() muidx, max2ErrorEst, firstGreedyIter = [], np.inf, True while firstGreedyIter or (max2ErrorEst > self.greedyTolMarginal and self.samplerMarginalGrid.npoints < self.maxIterMarginal): errorEstTest, muidx, maxErrorEst, mu = \ self.greedyNextSampleMarginal(muidx, plotEst) if len(maxErrorEst) > 0: max2ErrorEst = np.max(maxErrorEst) vbMng(self, "MAIN", ("Uniform testing error estimate " "{:.4e}.").format(max2ErrorEst), 3) else: max2ErrorEst = 0. firstGreedyIter = False if plotEst == "LAST": self.plotEstimatorMarginal(errorEstTest, muidx, maxErrorEst) vbMng(self, "DEL", ("Done computing snapshots (final snapshot count: " "{}).").format(np.sum(self.samplingEngine.nsamples)), 3) return 0 def checkComputedApprox(self) -> bool: return (super().checkComputedApprox() and len(self.mus) == len(self.trainedModel.data.mus)) def checkComputedApproxPivoted(self) -> bool: return (super().checkComputedApprox() and len(self.musMarginal) == len(self.trainedModel.data.musMarginal)) diff --git a/rrompy/reduction_methods/pivoted/trained_model/trained_model_pivoted.py b/rrompy/reduction_methods/pivoted/trained_model/trained_model_pivoted.py index f29a97f..07ccbc9 100644 --- a/rrompy/reduction_methods/pivoted/trained_model/trained_model_pivoted.py +++ b/rrompy/reduction_methods/pivoted/trained_model/trained_model_pivoted.py @@ -1,487 +1,488 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import numpy as np from scipy.special import factorial as fact from copy import deepcopy as copy from itertools import combinations from rrompy.reduction_methods.standard.trained_model.trained_model_rational \ import TrainedModelRational from rrompy.utilities.base.types import (Np1D, Np2D, List, ListAny, paramVal, paramList, sampList, HFEng) from rrompy.utilities.base import verbosityManager as vbMng, freepar as fp from rrompy.utilities.numerical.point_matching import (pointMatching, chordalMetricAdjusted, potential) from rrompy.utilities.numerical.degree import reduceDegreeN from rrompy.utilities.poly_fitting.polynomial import (polybases as ppb, PolynomialInterpolator as PI) from rrompy.utilities.poly_fitting.radial_basis import (polybases as rbpb, RadialBasisInterpolator as RBI) from rrompy.utilities.poly_fitting.moving_least_squares import ( MovingLeastSquaresInterpolator as MLSI) from rrompy.utilities.poly_fitting.heaviside import (heavisideUniformShape, rational2heaviside, HeavisideInterpolator as HI) from rrompy.utilities.exception_manager import (RROMPyException, RROMPyAssert, RROMPyWarning) from rrompy.parameter import checkParameter, checkParameterList from rrompy.sampling import emptySampleList __all__ = ['TrainedModelPivoted'] class TrainedModelPivoted(TrainedModelRational): """ ROM approximant evaluation for pivoted approximants (with pole matching). Attributes: Data: dictionary with all that can be pickled. """ def centerNormalizePivot(self, mu : paramList = [], mu0 : paramVal = None) -> paramList: """ Compute normalized parameter to be plugged into approximant. Args: mu: Parameter(s) 1. mu0: Parameter(s) 2. If None, set to self.data.mu0Pivot. Returns: Normalized parameter. """ mu = checkParameterList(mu, self.data.nparPivot)[0] if mu0 is None: mu0 = self.data.mu0Pivot rad = ((mu ** self.data.rescalingExpPivot - mu0 ** self.data.rescalingExpPivot) / self.data.scaleFactorPivot) return rad def centerNormalizeMarginal(self, mu : paramList = [], mu0 : paramVal = None) -> paramList: """ Compute normalized parameter to be plugged into approximant. Args: mu: Parameter(s) 1. mu0: Parameter(s) 2. If None, set to self.data.mu0Marginal. Returns: Normalized parameter. """ mu = checkParameterList(mu, self.data.nparMarginal)[0] if mu0 is None: mu0 = self.data.mu0Marginal rad = ((mu ** self.data.rescalingExpMarginal - mu0 ** self.data.rescalingExpMarginal) / self.data.scaleFactorMarginal) return rad def updateEffectiveSamples(self, HFEngine:HFEng, exclude : List[int] = None, matchingWeight : float = 1., POD : bool = True, is_state : bool = True): if hasattr(self, "_idxExcl"): for j, excl in enumerate(self._idxExcl): self.data.musMarginal.insert(self._musMExcl[j], excl) self.data.HIs.insert(excl, self._HIsExcl[j]) self.data.Ps.insert(excl, self._PsExcl[j]) self.data.Qs.insert(excl, self._QsExcl[j]) if exclude is None: exclude = [] self._idxExcl, self._musMExcl = list(np.sort(exclude)), [] self._HIsExcl, self._PsExcl, self._QsExcl = [], [], [] for excl in self._idxExcl[::-1]: self._musMExcl = [self.data.musMarginal[excl]] + self._musMExcl self.data.musMarginal.pop(excl) self._HIsExcl = [self.data.HIs.pop(excl)] + self._HIsExcl self._PsExcl = [self.data.Ps.pop(excl)] + self._PsExcl self._QsExcl = [self.data.Qs.pop(excl)] + self._QsExcl poles = [hi.poles for hi in self.data.HIs] coeffs = [hi.coeffs for hi in self.data.HIs] self.initializeFromLists(poles, coeffs, self.data.HIs[0].polybasis, HFEngine, matchingWeight, POD, is_state) def setupMarginalInterp(self, approx, interpPars:ListAny, MMAuto:bool, rDWM : Np1D = None, noWarnReduceAuto : bool = True): vbMng(self, "INIT", "Starting computation of marginal interpolator.", 12) musMCN = self.centerNormalizeMarginal(self.data.musMarginal) pbM = approx.polybasisMarginal if pbM not in ppb: rDWMEf = np.array(rDWM) self.data.marginalInterp = [] for ipts, pts in enumerate(self.data.suppEffPts): mI = [None] * len(musMCN) if len(pts) > 0: musMCNEff = musMCN[pts] if MMAuto: if ipts > 0: verb = approx.verbosity approx.verbosity = 0 _musM = approx.musMarginal approx.musMarginal = musMCNEff approx._setMMarginalAuto() if ipts > 0: approx.musMarginal = _musM approx.verbosity = verb if ipts == 0: _MMarginalEffective = approx.MMarginal if not MMAuto: approx.MMarginal = _MMarginalEffective MM = reduceDegreeN(approx.MMarginal, len(musMCNEff), self.data.nparMarginal, approx.polydegreetypeMarginal) if MM < approx.MMarginal: if ipts == 0 and not noWarnReduceAuto: RROMPyWarning(("MMarginal too large compared to " "SMarginal. Reducing MMarginal by " "{}").format(approx.MMarginal - MM)) approx.MMarginal = MM MMEff = approx.MMarginal for j in range(len(musMCNEff)): canonicalj = 1. * (np.arange(len(musMCNEff)) == j) while MMEff >= 0 and (pbM in ppb or rDWMEf[0] <= rDWM[0] * 2 ** 6): pParRest = copy(interpPars) if pbM in ppb: p = PI() else: pParRest = [rDWMEf] + pParRest p = RBI() if pbM in rbpb else MLSI() wellCond, msg = p.setupByInterpolation(musMCNEff, canonicalj, MMEff, pbM, *pParRest) vbMng(self, "MAIN", msg, 30) if wellCond: break if pbM in ppb: vbMng(self, "MAIN", ("Polyfit is poorly conditioned. Reducing " "MMarginal by 1."), 35) MMEff -= 1 else: vbMng(self, "MAIN", ("Polyfit is poorly conditioned. " "Multiplying radialDirectionalWeightsMarginal " "by 2."), 35) rDWMEf *= 2. if MMEff < 0 or (pbM not in ppb and rDWMEf[0] > rDWM[0] * 2 ** 6): raise RROMPyException(("Instability in computation of " "interpolant. Aborting.")) if pbM in ppb: MMEff = approx.MMarginal else: rDWMEf = np.array(rDWM) mI[pts[j]] = copy(p) self.data.marginalInterp += [mI] approx.MMarginal = _MMarginalEffective vbMng(self, "DEL", "Done computing marginal interpolator.", 12) def initializeFromLists(self, poles:ListAny, coeffs:ListAny, basis:str, HFEngine:HFEng, matchingWeight : float = 1., POD : bool = True, is_state : bool = True): """Initialize Heaviside representation.""" musM = self.data.musMarginal margAbsDist = np.sum(np.abs(np.repeat(musM.data, len(musM), 0) - np.tile(musM.data, [len(musM), 1]) ), axis = 1).reshape(len(musM), len(musM)) explored = [0] unexplored = list(range(1, len(musM))) poles, coeffs = heavisideUniformShape(poles, coeffs) N = len(poles[0]) for _ in range(1, len(musM)): minIdx = np.argmin(np.concatenate([margAbsDist[ex, unexplored] \ for ex in explored])) minIex = explored[minIdx // len(unexplored)] minIunex = unexplored[minIdx % len(unexplored)] resex = coeffs[minIex][: N] resunex = coeffs[minIunex][: N] if matchingWeight != 0 and not POD: - resex = self.data.projMat.dot(resex.T) - resunex = self.data.projMat.dot(resunex.T) + resex = self.data.projMat[:, : resex.shape[1]].dot(resex.T) + resunex = self.data.projMat[:, : resunex.shape[1]].dot( + resunex.T) dist = chordalMetricAdjusted(poles[minIex], poles[minIunex], matchingWeight, resex, resunex, HFEngine, is_state) reordering = pointMatching(dist)[1] poles[minIunex] = poles[minIunex][reordering] coeffs[minIunex][: N] = coeffs[minIunex][reordering] explored += [minIunex] unexplored.remove(minIunex) HIs = [] for pls, cfs in zip(poles, coeffs): hsi = HI() hsi.poles = pls hsi.coeffs = cfs hsi.npar = 1 hsi.polybasis = basis HIs += [hsi] self.data.HIs = HIs self.data.suppEffPts = [np.arange(len(self.data.HIs))] self.data.suppEffIdx = np.zeros(N, dtype = int) def initializeFromRational(self, HFEngine:HFEng, matchingWeight : float = 1., POD : bool = True, is_state : bool = True): """Initialize Heaviside representation.""" RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters") poles, coeffs = [], [] for Q, P in zip(self.data.Qs, self.data.Ps): cfs, pls, basis = rational2heaviside(P, Q) poles += [pls] coeffs += [cfs] self.initializeFromLists(poles, coeffs, basis, HFEngine, matchingWeight, POD, is_state) def recompressByCutOff(self, tol:float, kind:str, foci:List[np.complex], ground:float) -> str: N = len(self.data.HIs[0].poles) mu0 = np.mean(foci) goodLocPoles = np.array([potential(hi.poles, foci) - ground <= tol * ground for hi in self.data.HIs]) self.data.suppEffPts = [np.arange(len(self.data.HIs))] self.data.suppEffIdx = np.zeros(N, dtype = int) if np.all(goodLocPoles): return " No poles erased." kind = kind.upper().strip().replace(" ","") goodAllPoles = np.all(goodLocPoles, axis = 0) badPoles = np.logical_not(goodAllPoles) if kind == "HARD": keepPole = np.where(goodAllPoles)[0] halfPole = np.empty(0, dtype = int) removePole = np.where(badPoles)[0] elif kind == "SOFT": goodSomePoles = np.any(goodLocPoles, axis = 0) keepPole = np.where(goodSomePoles)[0] halfPole = np.where(np.logical_and(badPoles, goodSomePoles))[0] removePole = np.where(np.logical_not(goodSomePoles))[0] else: raise RROMPyException("Cutoff kind not recognized.") if len(removePole) > 0: keepCoeff = np.append(keepPole, np.append([N], np.arange(N + 1, len(self.data.HIs[0].coeffs)))) for hi in self.data.HIs: polyCorrection = np.zeros_like(hi.coeffs[0, :]) for j in removePole: if not np.isinf(hi.poles[j]): polyCorrection += hi.coeffs[j, :] / (mu0 - hi.poles[j]) if len(hi.coeffs) == N: hi.coeffs = np.vstack((hi.coeffs, polyCorrection)) else: hi.coeffs[N, :] += polyCorrection hi.poles = hi.poles[keepPole] hi.coeffs = hi.coeffs[keepCoeff, :] for idxR in halfPole: pts = np.where(goodLocPoles[:, idxR])[0] idxEff = len(self.data.suppEffPts) for idEff, prevPts in enumerate(self.data.suppEffPts): if len(prevPts) == len(pts): if np.allclose(prevPts, pts): idxEff = idEff break if idxEff == len(self.data.suppEffPts): self.data.suppEffPts += [pts] self.data.suppEffIdx[idxR] = idxEff self.data.suppEffIdx = self.data.suppEffIdx[keepPole] return (" Hard-erased {} pole".format(len(removePole)) + "s" * (len(removePole) != 1) + " and soft-erased {} pole".format(len(halfPole)) + "s" * (len(halfPole) != 1) + ".") def _interpolateMarginal(self, muC : paramList, objs : ListAny) -> Np2D: res = np.zeros(objs[0].shape + (len(muC),), dtype = objs[0].dtype) for suppIdx in range(len(self.data.suppEffPts)): i = np.where(self.data.suppEffIdx == suppIdx)[0] if suppIdx == 0: i = np.append(i, np.arange(len(self.data.suppEffIdx), len(res))) if len(i) > 0: for mIj, obj in zip(self.data.marginalInterp[suppIdx], objs): if mIj is not None: res[i] += np.expand_dims(obj[i], - 1) * mIj(muC) return res def interpolateMarginalInterpolator(self, mu : paramVal = []) -> Np1D: """Obtain interpolated approximant interpolator.""" mu = checkParameter(mu, self.data.nparMarginal)[0] hsi = HI() hsi.poles = self.interpolateMarginalPoles(mu)[..., 0] hsi.coeffs = self.interpolateMarginalCoeffs(mu)[..., 0] hsi.npar = 1 hsi.polybasis = self.data.HIs[0].polybasis return hsi def interpolateMarginalPoles(self, mu : paramList = []) -> Np1D: """Obtain interpolated approximant poles.""" mu = checkParameterList(mu, self.data.nparMarginal)[0] muC = self.centerNormalizeMarginal(mu) vbMng(self, "INIT", "Interpolating marginal poles at mu = {}.".format(mu), 95) intMPoles = self._interpolateMarginal(muC, [hi.poles for hi in self.data.HIs]) vbMng(self, "DEL", "Done interpolating marginal poles.", 95) return intMPoles def interpolateMarginalCoeffs(self, mu : paramList = []) -> Np1D: """Obtain interpolated approximant coefficients.""" mu = checkParameterList(mu, self.data.nparMarginal)[0] muC = self.centerNormalizeMarginal(mu) vbMng(self, "INIT", "Interpolating marginal coefficients at mu = {}.".format(mu), 95) intMCoeffs = self._interpolateMarginal(muC, [hi.coeffs for hi in self.data.HIs]) vbMng(self, "DEL", "Done interpolating marginal coefficients.", 95) return intMCoeffs def getApproxReduced(self, mu : paramList = []) -> sampList: """ Evaluate reduced representation of approximant at arbitrary parameter. Args: mu: Target parameter. """ RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters") mu = checkParameterList(mu, self.data.npar)[0] if (not hasattr(self, "lastSolvedApproxReduced") or self.lastSolvedApproxReduced != mu): vbMng(self, "INIT", "Evaluating approximant at mu = {}.".format(mu), 12) self.uApproxReduced = emptySampleList() for i, muPL in enumerate(mu): muL = self.centerNormalizePivot([muPL(0, x) \ for x in self.data.directionPivot]) muM = [muPL(0, x) for x in self.data.directionMarginal] vbMng(self, "INIT", "Assembling reduced model for mu = {}.".format(muPL), 87) hsL = self.interpolateMarginalInterpolator(muM) vbMng(self, "DEL", "Done assembling reduced model.", 87) uAppR = hsL(muL) if i == 0: self.uApproxReduced.reset((len(uAppR), len(mu)), dtype = uAppR.dtype) self.uApproxReduced[i] = uAppR vbMng(self, "DEL", "Done evaluating approximant.", 12) self.lastSolvedApproxReduced = mu return self.uApproxReduced def getPVal(self, mu : paramList = []) -> sampList: """ Evaluate rational numerator at arbitrary parameter. Args: mu: Target parameter. """ RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters") mu = checkParameterList(mu, self.data.npar)[0] p = emptySampleList() p.reset((len(self.data.HIs[0].coeffs.shape[1]), len(mu))) for i, muPL in enumerate(mu): muL = self.centerNormalizePivot([muPL(0, x) \ for x in self.data.directionPivot]) muM = [muPL(0, x) for x in self.data.directionMarginal] hsL = self.interpolateMarginalInterpolator(muM) p[i] = hsL(muL) * np.prod(muL(0, 0) - hsL.poles) return p def getQVal(self, mu:Np1D, der : List[int] = None, scl : Np1D = None) -> Np1D: """ Evaluate rational denominator at arbitrary parameter. Args: mu: Target parameter. der(optional): Derivatives to take before evaluation. """ RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters") mu = checkParameterList(mu, self.data.npar)[0] muP = self.centerNormalizePivot(checkParameterList( mu.data[:, self.data.directionPivot], self.data.nparPivot)[0]) muM = checkParameterList(mu.data[:, self.data.directionMarginal], self.data.nparMarginal)[0] if der is None: derP, derM = 0, [0] else: derP = der[self.data.directionPivot[0]] derM = [der[x] for x in self.data.directionMarginal] if np.any(np.array(derM) != 0): raise RROMPyException(("Derivatives of Q with respect to marginal " "parameters not allowed.")) sclP = 1 if scl is None else scl[self.data.directionPivot[0]] derVal = np.zeros(len(mu), dtype = np.complex) N = len(self.data.HIs[0].poles) if derP == N: derVal[:] = 1. elif derP >= 0 and derP < N: pls = self.interpolateMarginalPoles(muM).T plsDist = muP.data.reshape(-1, 1) - pls for terms in combinations(np.arange(N), N - derP): derVal += np.prod(plsDist[:, list(terms)], axis = 1) return sclP ** derP * fact(derP) * derVal def getPoles(self, *args, **kwargs) -> Np1D: """ Obtain approximant poles. Returns: Numpy complex vector of poles. """ RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters") if len(args) + len(kwargs) > 1: raise RROMPyException(("Wrong number of parameters passed. " "Only 1 available.")) elif len(args) + len(kwargs) == 1: if len(args) == 1: mVals = args[0] else: mVals = kwargs["marginalVals"] if not hasattr(mVals, "__len__"): mVals = [mVals] mVals = list(mVals) else: mVals = [fp] try: rDim = mVals.index(fp) if rDim < len(mVals) - 1 and fp in mVals[rDim + 1 :]: raise except: raise RROMPyException(("Exactly 1 'freepar' entry in " "marginalVals must be provided.")) if rDim != self.data.directionPivot[0]: raise RROMPyException(("'freepar' entry in marginalVals must " "coincide with pivot direction.")) mVals[rDim] = self.data.mu0(rDim) mMarg = [mVals[j] for j in range(len(mVals)) if j != rDim] roots = self.interpolateMarginalPoles(mMarg)[..., 0] return np.power(self.data.mu0(rDim) ** self.data.rescalingExp[rDim] + self.data.scaleFactor[rDim] * roots, 1. / self.data.rescalingExp[rDim]) def getResidues(self, *args, **kwargs) -> Np1D: """ Obtain approximant residues. Returns: Numpy matrix with residues as columns. """ pls = self.getPoles(*args, **kwargs) if len(args) == 1: mVals = args[0] elif len(args) == 0: mVals = [None] else: mVals = kwargs["marginalVals"] if not hasattr(mVals, "__len__"): mVals = [mVals] mVals = list(mVals) rDim = mVals.index(fp) mMarg = [mVals[j] for j in range(len(mVals)) if j != rDim] residues = self.interpolateMarginalCoeffs(mMarg)[: len(pls), :, 0] - res = self.data.projMat.dot(residues.T) + res = self.data.projMat[:, : residues.shape[1]].dot(residues.T) return pls, res diff --git a/rrompy/reduction_methods/standard/greedy/rational_interpolant_greedy.py b/rrompy/reduction_methods/standard/greedy/rational_interpolant_greedy.py index 7eafcef..f90c1c4 100644 --- a/rrompy/reduction_methods/standard/greedy/rational_interpolant_greedy.py +++ b/rrompy/reduction_methods/standard/greedy/rational_interpolant_greedy.py @@ -1,547 +1,548 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from copy import deepcopy as copy import numpy as np from rrompy.hfengines.base.linear_affine_engine import checkIfAffine from .generic_greedy_approximant import GenericGreedyApproximant from rrompy.utilities.poly_fitting.polynomial import (polybases, PolynomialInterpolator as PI, polyvanderTotal as pvT) from rrompy.utilities.numerical import dot from rrompy.utilities.numerical.degree import totalDegreeN from rrompy.utilities.expression import expressionEvaluator from rrompy.reduction_methods.standard import RationalInterpolant from rrompy.utilities.base.types import Np1D, Tuple, paramVal, List from rrompy.utilities.base import verbosityManager as vbMng from rrompy.utilities.poly_fitting import customFit from rrompy.utilities.exception_manager import (RROMPyWarning, RROMPyException, RROMPyAssert, RROMPy_FRAGILE) from rrompy.parameter import checkParameterList from rrompy.sampling import sampleList, emptySampleList __all__ = ['RationalInterpolantGreedy'] class RationalInterpolantGreedy(GenericGreedyApproximant, RationalInterpolant): """ ROM greedy rational interpolant computation for parametric problems. Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': whether to compute POD of snapshots; defaults to True; - 'S': number of starting training points; - 'sampler': sample point generator; - 'greedyTol': uniform error tolerance for greedy algorithm; defaults to 1e-2; - 'collinearityTol': collinearity tolerance for greedy algorithm; defaults to 0.; - 'maxIter': maximum number of greedy steps; defaults to 1e2; - 'nTestPoints': number of test points; defaults to 5e2; - 'trainSetGenerator': training sample points generator; defaults to sampler; - 'polybasis': type of basis for interpolation; defaults to 'MONOMIAL'; - 'errorEstimatorKind': kind of error estimator; available values include 'AFFINE', 'DISCREPANCY', 'LOOK_AHEAD', 'LOOK_AHEAD_RES', 'LOOK_AHEAD_OUTPUT', and 'NONE'; defaults to 'NONE'; - 'interpRcond': tolerance for interpolation; defaults to None; - 'robustTol': tolerance for robust rational denominator management; defaults to 0. Defaults to empty dict. approx_state(optional): Whether to approximate state. Defaults and must be True. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. mu0: Default parameter. mus: Array of snapshot parameters. approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList. parameterListSoft: Recognized keys of soft approximant parameters: - 'POD': whether to compute POD of snapshots. - 'greedyTol': uniform error tolerance for greedy algorithm; - 'collinearityTol': collinearity tolerance for greedy algorithm; - 'maxIter': maximum number of greedy steps; - 'nTestPoints': number of test points; - 'trainSetGenerator': training sample points generator; - 'errorEstimatorKind': kind of error estimator; - 'interpRcond': tolerance for interpolation; - 'robustTol': tolerance for robust rational denominator management. parameterListCritical: Recognized keys of critical approximant parameters: - 'S': total number of samples current approximant relies upon; - 'sampler': sample point generator. approx_state: Whether to approximate state. verbosity: Verbosity level. POD: whether to compute POD of snapshots. S: number of test points. sampler: Sample point generator. greedyTol: uniform error tolerance for greedy algorithm. collinearityTol: Collinearity tolerance for greedy algorithm. maxIter: maximum number of greedy steps. nTestPoints: number of starting training points. trainSetGenerator: training sample points generator. robustTol: tolerance for robust rational denominator management. errorEstimatorKind: kind of error estimator. interpRcond: tolerance for interpolation. robustTol: tolerance for robust rational denominator management. muBounds: list of bounds for parameter values. samplingEngine: Sampling engine. estimatorNormEngine: Engine for estimator norm computation. uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as sampleList. lastSolvedHF: Parameter(s) corresponding to last computed high fidelity solution(s) as parameterList. uApproxReduced: Reduced approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApproxReduced: Parameter(s) corresponding to last computed reduced approximate solution(s) as parameterList. uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApprox: Parameter(s) corresponding to last computed approximate solution(s) as parameterList. """ _allowedEstimatorKinds = ["AFFINE", "DISCREPANCY", "LOOK_AHEAD", "LOOK_AHEAD_RES", "LOOK_AHEAD_OUTPUT", "NONE"] def __init__(self, *args, **kwargs): self._preInit() self._addParametersToList(["errorEstimatorKind"], ["DISCREPANCY"], toBeExcluded = ["M", "N", "polydegreetype", "radialDirectionalWeights", "nNearestNeighbor"]) super().__init__(*args, **kwargs) if not self.approx_state and self.errorEstimatorKind not in [ "LOOK_AHEAD", "LOOK_AHEAD_OUTPUT", "NONE"]: raise RROMPyException(("Must compute greedy approximation of " "state, unless error estimator allows " "otherwise.")) self.M, self.N = ("AUTO",) * 2 self._postInit() @property def approx_state(self): """Value of approx_state.""" return self._approx_state @approx_state.setter def approx_state(self, approx_state): RationalInterpolant.approx_state.fset(self, approx_state) if (not self.approx_state and hasattr(self, "_errorEstimatorKind") and self.errorEstimatorKind not in [ "LOOK_AHEAD", "LOOK_AHEAD_OUTPUT", "NONE"]): raise RROMPyException(("Must compute greedy approximation of " "state, unless error estimator allows " "otherwise.")) @property def E(self): """Value of E.""" self._E = self.sampleBatchIdx - 1 return self._E @E.setter def E(self, E): RROMPyWarning(("E is used just to simplify inheritance, and its value " "cannot be changed from that of sampleBatchIdx - 1.")) def _setMAuto(self): self.M = self.E def _setNAuto(self): self.N = self.E @property def polydegreetype(self): """Value of polydegreetype.""" return "TOTAL" @polydegreetype.setter def polydegreetype(self, polydegreetype): RROMPyWarning(("polydegreetype is used just to simplify inheritance, " "and its value cannot be changed from 'TOTAL'.")) @property def polybasis(self): """Value of polybasis.""" return self._polybasis @polybasis.setter def polybasis(self, polybasis): try: polybasis = polybasis.upper().strip().replace(" ","") if polybasis not in polybases: raise RROMPyException("Sample type not recognized.") self._polybasis = polybasis except: RROMPyWarning(("Prescribed polybasis not recognized. Overriding " "to 'MONOMIAL'.")) self._polybasis = "MONOMIAL" self._approxParameters["polybasis"] = self.polybasis @property def errorEstimatorKind(self): """Value of errorEstimatorKind.""" return self._errorEstimatorKind @errorEstimatorKind.setter def errorEstimatorKind(self, errorEstimatorKind): errorEstimatorKind = errorEstimatorKind.upper() if errorEstimatorKind not in self._allowedEstimatorKinds: RROMPyWarning(("Error estimator kind not recognized. Overriding " "to 'NONE'.")) errorEstimatorKind = "NONE" self._errorEstimatorKind = errorEstimatorKind self._approxParameters["errorEstimatorKind"] = self.errorEstimatorKind if (self.errorEstimatorKind not in [ "LOOK_AHEAD", "LOOK_AHEAD_OUTPUT", "NONE"] and hasattr(self, "_approx_state") and not self.approx_state): raise RROMPyException(("Must compute greedy approximation of " "state, unless error estimator allows " "otherwise.")) def _polyvanderAuxiliary(self, mus, deg, *args): return pvT(mus, deg, *args) def getErrorEstimatorDiscrepancy(self, mus:Np1D) -> Np1D: """Discrepancy-based residual estimator.""" checkIfAffine(self.HFEngine, "apply discrepancy-based error estimator") mus = checkParameterList(mus, self.npar)[0] muCTest = self.trainedModel.centerNormalize(mus) verb = self.trainedModel.verbosity self.trainedModel.verbosity = 0 QTest = self.trainedModel.getQVal(mus) QTzero = np.where(QTest == 0.)[0] if len(QTzero) > 0: RROMPyWarning(("Adjusting estimator to avoid division by " "numerically zero denominator.")) QTest[QTzero] = np.finfo(np.complex).eps / (1. + self.N) self.HFEngine.buildA() self.HFEngine.buildb() nAs, nbs = self.HFEngine.nAs, self.HFEngine.nbs muTrainEff = self.mus ** self.HFEngine.rescalingExp muTestEff = mus ** self.HFEngine.rescalingExp PTrain = self.trainedModel.getPVal(self.mus).data.T QTrain = self.trainedModel.getQVal(self.mus) QTzero = np.where(QTrain == 0.)[0] if len(QTzero) > 0: RROMPyWarning(("Adjusting estimator to avoid division by " "numerically zero denominator.")) QTrain[QTzero] = np.finfo(np.complex).eps / (1. + self.N) PTest = self.trainedModel.getPVal(mus).data radiusAbTrain = np.empty((self.S, nAs * self.S + nbs), dtype = np.complex) radiusA = np.empty((self.S, nAs, len(mus)), dtype = np.complex) radiusb = np.empty((nbs, len(mus)), dtype = np.complex) for j, thA in enumerate(self.HFEngine.thAs): idxs = j * self.S + np.arange(self.S) radiusAbTrain[:, idxs] = expressionEvaluator(thA[0], muTrainEff, (self.S, 1)) * PTrain radiusA[:, j] = PTest * expressionEvaluator(thA[0], muTestEff, (len(mus),)) for j, thb in enumerate(self.HFEngine.thbs): idx = nAs * self.S + j radiusAbTrain[:, idx] = QTrain * expressionEvaluator(thb[0], muTrainEff, (self.S,)) radiusb[j] = QTest * expressionEvaluator(thb[0], muTestEff, (len(mus),)) QRHSNorm2 = self._affineResidualMatricesContraction(radiusb) vanTrain = self._polyvanderAuxiliary(self._musUniqueCN, self.E, self.polybasis0, self._derIdxs, self._reorder) interpPQ = customFit(vanTrain, radiusAbTrain, rcond = self.interpRcond) vanTest = self._polyvanderAuxiliary(muCTest, self.E, self.polybasis0) DradiusAb = vanTest.dot(interpPQ) radiusA = (radiusA - DradiusAb[:, : - nbs].reshape(len(mus), -1, self.S).T) radiusb = radiusb - DradiusAb[:, - nbs :].T ff, Lf, LL = self._affineResidualMatricesContraction(radiusb, radiusA) err = np.abs((LL - 2. * np.real(Lf) + ff) / QRHSNorm2) ** .5 self.trainedModel.verbosity = verb return err def getErrorEstimatorLookAhead(self, mus:Np1D, what : str = "") -> Tuple[Np1D, List[int]]: """Residual estimator based on look-ahead idea.""" errTest, QTest, idxMaxEst = self._EIMStep(mus) _approx_state_old = self.approx_state if what == "OUTPUT" and _approx_state_old: self._approx_state = False self.initEstimatorNormEngine() self._approx_state = _approx_state_old mu_muTestSample = mus[idxMaxEst] app_muTestSample = self.getApproxReduced(mu_muTestSample) if self._mode == RROMPy_FRAGILE: - if what == "RES" and not self.HFEngine.isCEye: + if what == "RES" and not self.HFEngine.isCEye: raise RROMPyException(("Cannot compute LOOK_AHEAD_RES " - "estimator in fragile mode for " - "non-scalar C.")) - app_muTestSample = dot(self.trainedModel.data.projMat, + "estimator in fragile mode for " + "non-scalar C.")) + app_muTestSample = dot(self.trainedModel.data.projMat[:, + : app_muTestSample.shape[0]], app_muTestSample.data) else: app_muTestSample = dot(self.samplingEngine.samples, app_muTestSample) if what == "RES": - errmu = self.HFEngine.residual(mu_muTestSample, app_muTestSample, - post_c = False) + errmu = self.HFEngine.residual(mu_muTestSample, app_muTestSample, + post_c = False) solmu = self.HFEngine.residual(mu_muTestSample, None, - post_c = False) + post_c = False) else: for j, mu in enumerate(mu_muTestSample): uEx = self.samplingEngine.nextSample(mu) if hasattr(self.samplingEngine, "samples_full"): uEx = self.samplingEngine.samples_full[-1] if j == 0: solmu = emptySampleList() solmu.reset((len(uEx), len(mu_muTestSample)), dtype = uEx.dtype) solmu[j] = uEx if what == "OUTPUT" and self.approx_state: solmu = sampleList(self.HFEngine.applyC(solmu.data)) app_muTestSample = sampleList(self.HFEngine.applyC( app_muTestSample.data)) errmu = solmu - app_muTestSample errsamples = (self.estimatorNormEngine.norm(errmu) / self.estimatorNormEngine.norm(solmu)) musT = copy(self.mus) musT.append(mu_muTestSample) musT = self.trainedModel.centerNormalize(musT) musC = self.trainedModel.centerNormalize(mus) errT = np.zeros(len(musT), dtype = np.complex) err = np.zeros(len(mus)) for l in range(len(mu_muTestSample)): errT[len(self.mus) + l] = errsamples[l] * QTest[idxMaxEst[l]] p = PI() wellCond, msg = p.setupByInterpolation(musT, errT, self.E + 1, self.polybasis, self.verbosity >= 15, True, {}, {"rcond": self.interpRcond}) err += np.abs(p(musC)) errT[len(self.mus) + l] = 0. err /= QTest vbMng(self, "MAIN", msg, 15) return err, idxMaxEst def getErrorEstimatorNone(self, mus:Np1D) -> Np1D: """EIM-based residual estimator.""" err = np.max(self._EIMStep(mus, True), axis = 1) err *= self.greedyTol / np.mean(err) return err def _EIMStep(self, mus:Np1D, only_one : bool = False) -> Tuple[Np1D, Np1D, List[int]]: """Residual estimator based on look-ahead idea.""" mus = checkParameterList(mus, self.npar)[0] verb = self.trainedModel.verbosity self.trainedModel.verbosity = 0 QTest = self.trainedModel.getQVal(mus) QTzero = np.where(QTest == 0.)[0] if len(QTzero) > 0: RROMPyWarning(("Adjusting estimator to avoid division by " "numerically zero denominator.")) QTest[QTzero] = np.finfo(np.complex).eps / (1. + self.N) QTest = np.abs(QTest) muCTest = self.trainedModel.centerNormalize(mus) muCTrain = self.trainedModel.centerNormalize(self.mus) vanTest = self._polyvanderAuxiliary(muCTest, self.E, self.polybasis) vanTestNext = self._polyvanderAuxiliary(muCTest, self.E + 1, self.polybasis)[:, vanTest.shape[1] :] idxsTest = np.arange(vanTestNext.shape[1]) basis = np.zeros((len(idxsTest), 0), dtype = float) idxMaxEst = [] while len(idxsTest) > 0: vanTrial = self._polyvanderAuxiliary(muCTrain, self.E, self.polybasis) vanTrialNext = self._polyvanderAuxiliary(muCTrain, self.E + 1, self.polybasis)[:, vanTrial.shape[1] :] vanTrial = np.hstack((vanTrial, vanTrialNext.dot(basis).reshape( len(vanTrialNext), basis.shape[1]))) valuesTrial = vanTrialNext[:, idxsTest] vanTestEff = np.hstack((vanTest, vanTestNext.dot(basis).reshape( len(vanTestNext), basis.shape[1]))) vanTestNextEff = vanTestNext[:, idxsTest] try: coeffTest = np.linalg.solve(vanTrial, valuesTrial) except np.linalg.LinAlgError as e: raise RROMPyException(e) errTest = (np.abs(vanTestNextEff - vanTestEff.dot(coeffTest)) / np.expand_dims(QTest, 1)) if only_one: self.trainedModel.verbosity = verb return errTest idxMaxErr = np.unravel_index(np.argmax(errTest), errTest.shape) idxMaxEst += [idxMaxErr[0]] muCTrain.append(muCTest[idxMaxErr[0]]) basis = np.pad(basis, [(0, 0), (0, 1)], "constant") basis[idxsTest[idxMaxErr[1]], -1] = 1. idxsTest = np.delete(idxsTest, idxMaxErr[1]) self.trainedModel.verbosity = verb return errTest, QTest, idxMaxEst def errorEstimator(self, mus:Np1D, return_max : bool = False) -> Np1D: """Standard residual-based error estimator.""" setupOK = self.setupApproxLocal() if setupOK > 0: err = np.empty(len(mus)) err[:] = np.nan if not return_max: return err return err, - setupOK, np.nan mus = checkParameterList(mus, self.npar)[0] vbMng(self.trainedModel, "INIT", "Evaluating error estimator at mu = {}.".format(mus), 10) if self.errorEstimatorKind == "AFFINE": err = self.getErrorEstimatorAffine(mus) else: self._setupInterpolationIndices() if self.errorEstimatorKind == "DISCREPANCY": err = self.getErrorEstimatorDiscrepancy(mus) elif self.errorEstimatorKind[: 10] == "LOOK_AHEAD": err, idxMaxEst = self.getErrorEstimatorLookAhead(mus, self.errorEstimatorKind[11 :]) else: #if self.errorEstimatorKind == "NONE": err = self.getErrorEstimatorNone(mus) vbMng(self.trainedModel, "DEL", "Done evaluating error estimator", 10) if not return_max: return err if self.errorEstimatorKind[: 10] != "LOOK_AHEAD": idxMaxEst = np.empty(self.sampleBatchSize, dtype = int) errCP = copy(err) for j in range(self.sampleBatchSize): k = np.argmax(errCP) idxMaxEst[j] = k if j + 1 < self.sampleBatchSize: musZero = self.trainedModel.centerNormalize(mus, mus[k]) errCP *= np.linalg.norm(musZero.data, axis = 1) return err, idxMaxEst, err[idxMaxEst] def plotEstimator(self, *args, **kwargs): super().plotEstimator(*args, **kwargs) if self.errorEstimatorKind == "NONE": vbMng(self, "MAIN", ("Warning! Error estimator has been arbitrarily normalized " "before plotting."), 15) def greedyNextSample(self, *args, **kwargs) -> Tuple[Np1D, int, float, paramVal]: """Compute next greedy snapshot of solution map.""" RROMPyAssert(self._mode, message = "Cannot add greedy sample.") self.sampleBatchIdx += 1 self.sampleBatchSize = totalDegreeN(self.npar - 1, self.sampleBatchIdx) err, muidx, maxErr, muNext = super().greedyNextSample(*args, **kwargs) if maxErr is not None and (np.any(np.isnan(maxErr)) or np.any(np.isinf(maxErr))): self.sampleBatchIdx -= 1 self.sampleBatchSize = totalDegreeN(self.npar - 1, self.sampleBatchIdx) if (self.errorEstimatorKind == "NONE" and not np.isnan(maxErr) and not np.isinf(maxErr)): maxErr = None return err, muidx, maxErr, muNext def _preliminaryTraining(self): """Initialize starting snapshots of solution map.""" RROMPyAssert(self._mode, message = "Cannot start greedy algorithm.") if self.samplingEngine.nsamples > 0: return S = self.S self.sampleBatchIdx, self.sampleBatchSize, self._S = -1, 0, 0 nextBatchSize = 1 while self._S + nextBatchSize <= S: self.sampleBatchIdx += 1 self.sampleBatchSize = nextBatchSize self._S += self.sampleBatchSize nextBatchSize = totalDegreeN(self.npar - 1, self.sampleBatchIdx + 1) super()._preliminaryTraining() def setupApproxLocal(self) -> int: """Compute rational interpolant.""" if self.checkComputedApprox(): return -1 RROMPyAssert(self._mode, message = "Cannot setup approximant.") self.verbosity -= 10 vbMng(self, "INIT", "Setting up local approximant.", 5) pMat = self.samplingEngine.samples.data pMatEff = dot(self.HFEngine.C, pMat) if self.approx_state else pMat if self.trainedModel is None: self.trainedModel = self.tModelType() self.trainedModel.verbosity = self.verbosity self.trainedModel.timestamp = self.timestamp datadict = {"mu0": self.mu0, "projMat": pMatEff, "scaleFactor": self.scaleFactor, "rescalingExp": self.HFEngine.rescalingExp} self.trainedModel.data = self.initializeModelData(datadict)[0] else: self.trainedModel = self.trainedModel self.trainedModel.data.projMat = copy(pMatEff) self.trainedModel.data.mus = copy(self.mus) self.trainedModel.data.mus = copy(self.mus) self.catchInstability = 2 unstable = False if self.E > 0: try: Q = self._setupDenominator()[0] except RROMPyException as RE: RROMPyWarning("Downgraded {}: {}".format(RE.__class__.__name__, RE)) vbMng(self, "DEL", "", 7) unstable = True else: Q = PI() Q.coeffs = np.ones((1,) * self.npar, dtype = np.complex) Q.npar = self.npar Q.polybasis = self.polybasis if not unstable: self.trainedModel.data.Q = copy(Q) try: P = copy(self._setupNumerator()) except RROMPyException as RE: RROMPyWarning("Downgraded {}: {}".format(RE.__class__.__name__, RE)) vbMng(self, "DEL", "", 7) unstable = True if not unstable: self.trainedModel.data.P = copy(P) self.trainedModel.data.approxParameters = copy( self.approxParameters) vbMng(self, "DEL", "Done setting up local approximant.", 5) self.catchInstability = 0 self.verbosity += 10 return 1 * unstable def setupApprox(self, plotEst : str = "NONE") -> int: val = super().setupApprox(plotEst) if val == 0: self._iterCorrector() self.trainedModel.data.approxParameters = copy( self.approxParameters) return val def loadTrainedModel(self, filename:str): """Load trained reduced model from file.""" super().loadTrainedModel(filename) self.sampleBatchIdx, self.sampleBatchSize, _S = -1, 0, 0 nextBatchSize = 1 while _S + nextBatchSize <= self.S + 1: self.sampleBatchIdx += 1 self.sampleBatchSize = nextBatchSize _S += self.sampleBatchSize nextBatchSize = totalDegreeN(self.npar - 1, self.sampleBatchIdx + 1) diff --git a/rrompy/reduction_methods/standard/trained_model/trained_model_rational.py b/rrompy/reduction_methods/standard/trained_model/trained_model_rational.py index 0c03679..1965383 100644 --- a/rrompy/reduction_methods/standard/trained_model/trained_model_rational.py +++ b/rrompy/reduction_methods/standard/trained_model/trained_model_rational.py @@ -1,177 +1,178 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import numpy as np from rrompy.reduction_methods.base.trained_model.trained_model import ( TrainedModel) from rrompy.utilities.base.types import (Np1D, List, paramVal, paramList, sampList) from rrompy.utilities.base import verbosityManager as vbMng, freepar as fp from rrompy.utilities.exception_manager import RROMPyException, RROMPyWarning from rrompy.parameter import (checkParameter, checkParameterList, emptyParameterList) from rrompy.sampling import sampleList __all__ = ['TrainedModelRational'] class TrainedModelRational(TrainedModel): """ ROM approximant evaluation for rational approximant. Attributes: Data: dictionary with all that can be pickled. """ def centerNormalize(self, mu : paramList = [], mu0 : paramVal = None) -> paramList: """ Compute normalized parameter to be plugged into approximant. Args: mu: Parameter(s) 1. mu0: Parameter(s) 2. If None, set to self.data.mu0. Returns: Normalized parameter. """ mu = checkParameterList(mu, self.data.npar)[0] if mu0 is None: mu0 = self.data.mu0 rad = ((mu ** self.data.rescalingExp - mu0 ** self.data.rescalingExp) / self.data.scaleFactor) return rad def getPVal(self, mu : paramList = []) -> sampList: """ Evaluate rational numerator at arbitrary parameter. Args: mu: Target parameter. """ mu = checkParameterList(mu, self.data.npar)[0] vbMng(self, "INIT", "Evaluating numerator at mu = {}.".format(mu), 17) muCenter = self.centerNormalize(mu) p = sampleList(self.data.P(muCenter)) vbMng(self, "DEL", "Done evaluating numerator.", 17) return p def getQVal(self, mu:Np1D, der : List[int] = None, scl : Np1D = None) -> Np1D: """ Evaluate rational denominator at arbitrary parameter. Args: mu: Target parameter. der(optional): Derivatives to take before evaluation. """ mu = checkParameterList(mu, self.data.npar)[0] vbMng(self, "INIT", "Evaluating denominator at mu = {}.".format(mu), 17) muCenter = self.centerNormalize(mu) q = self.data.Q(muCenter, der, scl) vbMng(self, "DEL", "Done evaluating denominator.", 17) return q def getApproxReduced(self, mu : paramList = []) -> sampList: """ Evaluate reduced representation of approximant at arbitrary parameter. Args: mu: Target parameter. """ mu = checkParameterList(mu, self.data.npar)[0] if (not hasattr(self, "lastSolvedApproxReduced") or self.lastSolvedApproxReduced != mu): vbMng(self, "INIT", "Evaluating approximant at mu = {}.".format(mu), 12) QV = self.getQVal(mu) QVzero = np.where(QV == 0.)[0] if len(QVzero) > 0: RROMPyWarning(("Adjusting approximation to avoid division by " "numerically zero denominator.")) QV[QVzero] = np.finfo(np.complex).eps / (1. + self.data.Q.deg[0]) self.uApproxReduced = self.getPVal(mu) / QV vbMng(self, "DEL", "Done evaluating approximant.", 12) self.lastSolvedApproxReduced = mu return self.uApproxReduced def getPoles(self, *args, **kwargs) -> Np1D: """ Obtain approximant poles. Returns: Numpy complex vector of poles. """ if len(args) + len(kwargs) > 1: raise RROMPyException(("Wrong number of parameters passed. " "Only 1 available.")) elif len(args) + len(kwargs) == 1: if len(args) == 1: mVals = args[0] else: mVals = kwargs["marginalVals"] if not hasattr(mVals, "__len__"): mVals = [mVals] mVals = list(mVals) else: mVals = [fp] try: rDim = mVals.index(fp) if rDim < len(mVals) - 1 and fp in mVals[rDim + 1 :]: raise except: raise RROMPyException(("Exactly 1 'freepar' entry in " "marginalVals must be provided.")) mVals[rDim] = self.data.mu0(rDim) mVals = self.centerNormalize(checkParameter(mVals, len(mVals))) mVals = list(mVals.data.flatten()) mVals[rDim] = fp return np.power(self.data.mu0(rDim) ** self.data.rescalingExp[rDim] + self.data.scaleFactor[rDim] * self.data.Q.roots(mVals), 1. / self.data.rescalingExp[rDim]) def getResidues(self, *args, **kwargs) -> Np1D: """ Obtain approximant residues. Returns: Numpy matrix with residues as columns. """ pls = self.getPoles(*args, **kwargs) if len(args) == 1: mVals = args[0] elif len(args) == 0: mVals = [None] else: mVals = kwargs["marginalVals"] if not hasattr(mVals, "__len__"): mVals = [mVals] mVals = list(mVals) rDim = mVals.index(fp) poles = emptyParameterList() poles.reset((len(pls), self.data.npar), dtype = pls.dtype) for k, pl in enumerate(pls): poles[k] = mVals poles.data[k, rDim] = pl QV = self.getQVal(poles, list(1 * (np.arange(self.data.npar) == rDim))) QVzero = np.where(QV == 0.)[0] if len(QVzero) > 0: RROMPyWarning(("Adjusting residuals to avoid division by " "numerically zero denominator.")) QV[QVzero] = np.finfo(np.complex).eps / (1. + self.data.Q.deg[0]) - res = self.data.projMat.dot(self.getPVal(poles).data) / QV + Res = self.getPVal(poles).data + res = self.data.projMat[:, : Res.shape[0]].dot(Res) / QV return pls, res