diff --git a/rrompy/hfengines/base/hfengine_base.py b/rrompy/hfengines/base/hfengine_base.py index dd46957..f2c94c2 100644 --- a/rrompy/hfengines/base/hfengine_base.py +++ b/rrompy/hfengines/base/hfengine_base.py @@ -1,284 +1,285 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from abc import abstractmethod import numpy as np +import scipy.sparse as scsp from numbers import Number from copy import copy as softcopy from rrompy.utilities.base.types import (Np1D, Np2D, List, DictAny, paramVal, paramList, sampList) from rrompy.utilities.numerical import (solve as tsolve, dot, customPInv, rayleighQuotientIteration) from rrompy.utilities.exception_manager import RROMPyAssert from rrompy.parameter import checkParameter, checkParameterList from rrompy.sampling import sampleList, emptySampleList from rrompy.solver import setupSolver __all__ = ['HFEngineBase'] class HFEngineBase: """Generic solver for parametric problems.""" def __init__(self, verbosity : int = 10, timestamp : bool = True): self.verbosity = verbosity self.timestamp = timestamp self.setSolver("SPSOLVE", {"use_umfpack" : False}) self.npar = 0 self._C = None self.outputNormMatrix = 1. def name(self) -> str: return self.__class__.__name__ def __str__(self) -> str: return self.name() def __repr__(self) -> str: return self.__str__() + " at " + hex(id(self)) def __dir_base__(self): return [x for x in self.__dir__() if x[:2] != "__"] def __deepcopy__(self, memo): return softcopy(self) @property def npar(self): """Value of npar.""" return self._npar @npar.setter def npar(self, npar): nparOld = self._npar if hasattr(self, "_npar") else -1 if npar != nparOld: self.rescalingExp = [1.] * npar self._npar = npar @property @abstractmethod def spacedim(self): return 1 def checkParameter(self, mu:paramVal): return checkParameter(mu, self.npar) def checkParameterList(self, mu:paramList): return checkParameterList(mu, self.npar) def buildEnergyNormForm(self): """ Build sparse matrix (in CSR format) representative of scalar product. """ self.energyNormMatrix = 1. def buildEnergyNormDualForm(self): """ Build sparse matrix (in CSR format) representative of dual scalar product. """ self.energyNormDualMatrix = 1. def buildEnergyNormPartialDualForm(self): """ Build sparse matrix (in CSR format) representative of dual scalar product without duality. """ self.energyNormPartialDualMatrix = 1. def innerProduct(self, u:Np2D, v:Np2D, onlyDiag : bool = False, dual : bool = False, is_state : bool = True) -> Np2D: """Scalar product.""" if is_state or self.isCEye: if dual: if not hasattr(self, "energyNormPartialDualMatrix"): self.buildEnergyNormPartialDualForm() energyMat = self.energyNormPartialDualMatrix else: if not hasattr(self, "energyNormMatrix"): self.buildEnergyNormForm() energyMat = self.energyNormMatrix else: energyMat = self.outputNormMatrix if not isinstance(u, (np.ndarray,)): u = u.data if not isinstance(v, (np.ndarray,)): v = v.data if onlyDiag: return np.sum(dot(energyMat, u) * v.conj(), axis = 0) return dot(dot(energyMat, u).T, v.conj()).T def norm(self, u:Np2D, dual : bool = False, is_state : bool = True) -> Np1D: return np.abs(self.innerProduct(u, u, onlyDiag = True, dual = dual, is_state = is_state)) ** .5 def baselineA(self): """Return 0 of shape consistent with operator of linear system.""" d = self.As[0].shape[0] return scsp.csr_matrix((np.zeros(0), np.zeros(0), np.zeros(d + 1)), shape = (d, d), dtype = np.complex) def baselineb(self): """Return 0 of shape consistent with RHS of linear system.""" return np.zeros(self.spacedim, dtype = np.complex) @abstractmethod def A(self, mu : paramVal = [], der : List[int] = 0) -> Np2D: """ Assemble terms of operator of linear system and return it (or its derivative) at a given parameter. """ return @abstractmethod def b(self, mu : paramVal = [], der : List[int] = 0) -> Np1D: """ Assemble terms of RHS of linear system and return it (or its derivative) at a given parameter. """ return @property def C(self): """Value of C.""" if self._C is None: self._C = 1. return self._C @property def isCEye(self): return isinstance(self.C, Number) def applyC(self, u:sampList): """Apply LHS of linear system.""" return dot(self.C, u) def applyCpInv(self, u:sampList): """Apply pseudoinverse of LHS of linear system.""" return dot(customPInv(self.C), u) _isStateShiftZero = True def stateShift(self, mu : paramVal = []) -> Np1D: return np.zeros((self.spacedim, len(mu))) _isOutputShiftZero = True def outputShift(self, mu : paramVal = []) -> Np1D: return self.applyC(self.stateShift(mu)) def setSolver(self, solverType:str, solverArgs : DictAny = {}): """Choose solver type and parameters.""" self._solver, self._solverArgs = setupSolver(solverType, solverArgs) def solve(self, mu : paramList = [], RHS : sampList = None, return_state : bool = False, verbose : bool = False) -> sampList: """ Find solution of linear system. Args: mu: parameter value. RHS: RHS of linear system. If None, defaults to that of parametric system. Defaults to None. return_state: whether to return state before multiplication by c. Defaults to False. verbose: whether to notify for each solution computed. Defaults to False. """ if mu == []: mu = self.mu0 mu = self.checkParameterList(mu)[0] if self.npar == 0: mu.reset((1, self.npar), mu.dtype) if len(mu) == 0: return emptySampleList() if RHS is None: RHS = [self.b(m) for m in mu] RHS = sampleList(RHS) mult = 0 if len(RHS) == 1 else 1 RROMPyAssert(mult * (len(mu) - 1) + 1, len(RHS), "Sample size") for j in range(len(mu)): u = tsolve(self.A(mu[j]), RHS[mult * j], self._solver, self._solverArgs) if return_state: if j == 0: sol = emptySampleList() sol.reset((len(u), len(mu)), dtype = u.dtype) sol[j] = u else: if j == 0: sol = np.empty((len(u), len(mu)), dtype = u.dtype) sol[:, j] = u if verbose: print("." * (j % 5 != 4) + "*" * (j % 5 == 4), end = "") if not return_state: sol = sampleList(self.applyC(sol) - self.outputShift(mu)) else: sol = sampleList(sol - self.stateShift(mu)) if verbose: print() return sol def residual(self, mu : paramList = [], u : sampList = None, post_c : bool = True) -> sampList: """ Find residual of linear system for given approximate solution. Args: mu: parameter value. u: numpy complex array with function dofs. If None, set to 0. post_c: whether to post-process using c. Defaults to True. """ if mu == []: mu = self.mu0 mu = self.checkParameterList(mu)[0] if self.npar == 0: mu.reset((1, self.npar), mu.dtype) if len(mu) == 0: return emptySampleList() v = sampleList(self.stateShift(mu)) if u is not None: v = v + sampleList(u) for j in range(len(mu)): r = self.b(mu[j]) - dot(self.A(mu[j]), v[j]) if post_c: if j == 0: res = np.empty((len(r), len(mu)), dtype = r.dtype) res[:, j] = r else: if j == 0: res = emptySampleList() res.reset((len(r), len(mu)), dtype = r.dtype) res[j] = r if post_c: res = sampleList(self.applyC(res)) return res def stabilityFactor(self, mu : paramList = [], u : sampList = None, nIterP : int = 10, nIterR : int = 10) -> sampList: """ Find stability factor of matrix of linear system using iterative inverse power iteration- and Rayleigh quotient-based procedure. Args: mu: parameter values. u: numpy complex arrays with function dofs. nIterP: number of iterations of power method. nIterR: number of iterations of Rayleigh quotient method. """ if mu == []: mu = self.mu0 mu = self.checkParameterList(mu)[0] if self.npar == 0: mu.reset((1, self.npar), mu.dtype) u = sampleList(u) solShift = self.stateShift(mu) if len(u) == len(mu): u = u + solShift else: u = sampleList(solShift) + np.tile(u.data, (1, len(mu))) stabFact = np.empty(len(mu), dtype = float) if not hasattr(self, "energyNormMatrix"): self.buildEnergyNormForm() for j in range(len(mu)): stabFact[j] = rayleighQuotientIteration(self.A(mu[j]), u[j], self.energyNormMatrix, 0., nIterP, nIterR, self._solver, self._solverArgs) return stabFact diff --git a/rrompy/reduction_methods/greedy/generic_greedy_approximant.py b/rrompy/reduction_methods/greedy/generic_greedy_approximant.py index f8bf8ee..aa2d507 100644 --- a/rrompy/reduction_methods/greedy/generic_greedy_approximant.py +++ b/rrompy/reduction_methods/greedy/generic_greedy_approximant.py @@ -1,685 +1,689 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from copy import deepcopy as copy import numpy as np from rrompy.reduction_methods.standard.generic_standard_approximant \ import GenericStandardApproximant from rrompy.utilities.base.types import (Np1D, Np2D, DictAny, HFEng, Tuple, List, normEng, paramVal, paramList, sampList) from rrompy.utilities.base import verbosityManager as vbMng from rrompy.utilities.numerical import dot from rrompy.utilities.expression import expressionEvaluator from rrompy.solver import normEngine from rrompy.utilities.exception_manager import (RROMPyException, RROMPyAssert, RROMPyWarning) from rrompy.parameter import checkParameterList, emptyParameterList __all__ = ['GenericGreedyApproximant'] def localL2Distance(mus:Np2D, badmus:Np2D) -> Np2D: return np.linalg.norm(np.tile(mus[..., np.newaxis], [1, 1, len(badmus)]) - badmus[..., np.newaxis].T, axis = 1) def pruneSamples(mus:paramList, badmus:paramList, tol : float = 1e-8) -> Np1D: """Remove from mus all the elements which are too close to badmus.""" if len(badmus) == 0: return mus proximity = np.min(localL2Distance(mus.data, badmus.data), axis = 1) return np.arange(len(mus))[proximity <= tol] class GenericGreedyApproximant(GenericStandardApproximant): """ ROM greedy interpolant computation for parametric problems (ABSTRACT). Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': whether to compute POD of snapshots; defaults to True; - 'S': number of starting training points; - 'sampler': sample point generator; - 'greedyTol': uniform error tolerance for greedy algorithm; defaults to 1e-2; - 'collinearityTol': collinearity tolerance for greedy algorithm; defaults to 0.; - 'interactive': whether to interactively terminate greedy algorithm; defaults to False; - 'maxIter': maximum number of greedy steps; defaults to 1e2; - 'refinementRatio': ratio of test points to be exhausted before test set refinement; defaults to 0.2; - 'nTestPoints': number of test points; defaults to 5e2; - 'trainSetGenerator': training sample points generator; defaults to sampler. Defaults to empty dict. approx_state(optional): Whether to approximate state. Defaults to False. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. mu0: Default parameter. mus: Array of snapshot parameters. approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList. parameterListSoft: Recognized keys of soft approximant parameters: - 'POD': whether to compute POD of snapshots. - 'greedyTol': uniform error tolerance for greedy algorithm; - 'collinearityTol': collinearity tolerance for greedy algorithm; - 'interactive': whether to interactively terminate greedy algorithm; - 'maxIter': maximum number of greedy steps; - 'refinementRatio': ratio of test points to be exhausted before test set refinement; - 'nTestPoints': number of test points; - 'trainSetGenerator': training sample points generator. parameterListCritical: Recognized keys of critical approximant parameters: - 'S': total number of samples current approximant relies upon; - 'sampler': sample point generator. approx_state: Whether to approximate state. verbosity: Verbosity level. POD: whether to compute POD of snapshots. S: number of test points. sampler: Sample point generator. greedyTol: Uniform error tolerance for greedy algorithm. collinearityTol: Collinearity tolerance for greedy algorithm. interactive: whether to interactively terminate greedy algorithm. maxIter: maximum number of greedy steps. refinementRatio: ratio of training points to be exhausted before training set refinement. nTestPoints: number of starting training points. trainSetGenerator: training sample points generator. muBounds: list of bounds for parameter values. samplingEngine: Sampling engine. estimatorNormEngine: Engine for estimator norm computation. uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as sampleList. lastSolvedHF: Parameter(s) corresponding to last computed high fidelity solution(s) as parameterList. uApproxReduced: Reduced approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApproxReduced: Parameter(s) corresponding to last computed reduced approximate solution(s) as parameterList. uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApprox: Parameter(s) corresponding to last computed approximate solution(s) as parameterList. """ TOL_INSTABILITY = 1e-6 def __init__(self, HFEngine:HFEng, mu0 : paramVal = None, approxParameters : DictAny = {}, approx_state : bool = False, verbosity : int = 10, timestamp : bool = True): self._preInit() self._addParametersToList(["greedyTol", "collinearityTol", "interactive", "maxIter", "refinementRatio", "nTestPoints"], [1e-2, 0., False, 1e2, .2, 5e2], ["trainSetGenerator"], ["AUTO"]) super().__init__(HFEngine = HFEngine, mu0 = mu0, approxParameters = approxParameters, approx_state = approx_state, verbosity = verbosity, timestamp = timestamp) self._postInit() @property def greedyTol(self): """Value of greedyTol.""" return self._greedyTol @greedyTol.setter def greedyTol(self, greedyTol): if greedyTol < 0: raise RROMPyException("greedyTol must be non-negative.") if hasattr(self, "_greedyTol") and self.greedyTol is not None: greedyTolold = self.greedyTol else: greedyTolold = -1 self._greedyTol = greedyTol self._approxParameters["greedyTol"] = self.greedyTol if greedyTolold != self.greedyTol: self.resetSamples() @property def collinearityTol(self): """Value of collinearityTol.""" return self._collinearityTol @collinearityTol.setter def collinearityTol(self, collinearityTol): if collinearityTol < 0: raise RROMPyException("collinearityTol must be non-negative.") if (hasattr(self, "_collinearityTol") and self.collinearityTol is not None): collinearityTolold = self.collinearityTol else: collinearityTolold = -1 self._collinearityTol = collinearityTol self._approxParameters["collinearityTol"] = self.collinearityTol if collinearityTolold != self.collinearityTol: self.resetSamples() @property def interactive(self): """Value of interactive.""" return self._interactive @interactive.setter def interactive(self, interactive): self._interactive = interactive @property def maxIter(self): """Value of maxIter.""" return self._maxIter @maxIter.setter def maxIter(self, maxIter): if maxIter <= 0: raise RROMPyException("maxIter must be positive.") if hasattr(self, "_maxIter") and self.maxIter is not None: maxIterold = self.maxIter else: maxIterold = -1 self._maxIter = maxIter self._approxParameters["maxIter"] = self.maxIter if maxIterold != self.maxIter: self.resetSamples() @property def refinementRatio(self): """Value of refinementRatio.""" return self._refinementRatio @refinementRatio.setter def refinementRatio(self, refinementRatio): if refinementRatio <= 0. or refinementRatio > 1.: raise RROMPyException(("refinementRatio must be between 0 " "(excluded) and 1.")) if (hasattr(self, "_refinementRatio") and self.refinementRatio is not None): refinementRatioold = self.refinementRatio else: refinementRatioold = -1 self._refinementRatio = refinementRatio self._approxParameters["refinementRatio"] = self.refinementRatio if refinementRatioold != self.refinementRatio: self.resetSamples() @property def nTestPoints(self): """Value of nTestPoints.""" return self._nTestPoints @nTestPoints.setter def nTestPoints(self, nTestPoints): if nTestPoints <= 0: raise RROMPyException("nTestPoints must be positive.") if not np.isclose(nTestPoints, np.int(nTestPoints)): raise RROMPyException("nTestPoints must be an integer.") nTestPoints = np.int(nTestPoints) if hasattr(self, "_nTestPoints") and self.nTestPoints is not None: nTestPointsold = self.nTestPoints else: nTestPointsold = -1 self._nTestPoints = nTestPoints self._approxParameters["nTestPoints"] = self.nTestPoints if nTestPointsold != self.nTestPoints: self.resetSamples() @property def trainSetGenerator(self): """Value of trainSetGenerator.""" return self._trainSetGenerator @trainSetGenerator.setter def trainSetGenerator(self, trainSetGenerator): if (isinstance(trainSetGenerator, (str,)) and trainSetGenerator.upper() == "AUTO"): trainSetGenerator = self.sampler if 'generatePoints' not in dir(trainSetGenerator): raise RROMPyException("trainSetGenerator type not recognized.") if (hasattr(self, '_trainSetGenerator') and self.trainSetGenerator not in [None, "AUTO"]): trainSetGeneratorOld = self.trainSetGenerator self._trainSetGenerator = trainSetGenerator self._approxParameters["trainSetGenerator"] = self.trainSetGenerator if (not 'trainSetGeneratorOld' in locals() or trainSetGeneratorOld != self.trainSetGenerator): self.resetSamples() def resetSamples(self): """Reset samples.""" super().resetSamples() self._mus = emptyParameterList() def initEstimatorNormEngine(self, normEngn : normEng = None): """Initialize estimator norm engine.""" if (normEngn is not None or not hasattr(self, "estimatorNormEngine") or self.estimatorNormEngine is None): if normEngn is None: if not hasattr(self.HFEngine, "energyNormPartialDualMatrix"): self.HFEngine.buildEnergyNormPartialDualForm() estimatorEnergyMatrix = ( self.HFEngine.energyNormPartialDualMatrix) else: if hasattr(normEngn, "buildEnergyNormPartialDualForm"): if not hasattr(normEngn, "energyNormPartialDualMatrix"): normEngn.buildEnergyNormPartialDualForm() estimatorEnergyMatrix = ( normEngn.energyNormPartialDualMatrix) else: estimatorEnergyMatrix = normEngn self.estimatorNormEngine = normEngine(estimatorEnergyMatrix) def _affineResidualMatricesContraction(self, rb:Np2D, rA : Np2D = None) \ -> Tuple[Np1D, Np1D, Np1D]: self.assembleReducedResidualBlocks(full = True) # 'ij,jk,ik->k', resbb, radiusb, radiusb.conj() ff = np.sum(self.trainedModel.data.resbb.dot(rb) * rb.conj(), axis = 0) if rA is None: return ff # 'ijk,jkl,il->l', resAb, radiusA, radiusb.conj() Lf = np.sum(np.tensordot(self.trainedModel.data.resAb, rA, 2) * rb.conj(), axis = 0) # 'ijkl,klt,ijt->t', resAA, radiusA, radiusA.conj() LL = np.sum(np.tensordot(self.trainedModel.data.resAA, rA, 2) * rA.conj(), axis = (0, 1)) return ff, Lf, LL def errorEstimator(self, mus:Np1D) -> Np1D: """Standard residual-based error estimator.""" self.setupApprox() + self.HFEngine.buildA() + self.HFEngine.buildb() mus = checkParameterList(mus, self.npar)[0] vbMng(self.trainedModel, "INIT", "Evaluating error estimator at mu = {}.".format(mus), 10) verb = self.trainedModel.verbosity self.trainedModel.verbosity = 0 uApproxRs = self.getApproxReduced(mus) muTestEff = mus ** self.HFEngine.rescalingExp radiusA = np.empty((len(self.HFEngine.thAs), len(mus)), dtype = np.complex) radiusb = np.empty((len(self.HFEngine.thbs), len(mus)), dtype = np.complex) for j, thA in enumerate(self.HFEngine.thAs): radiusA[j] = expressionEvaluator(thA[0], muTestEff) for j, thb in enumerate(self.HFEngine.thbs): radiusb[j] = expressionEvaluator(thb[0], muTestEff) radiusA = np.expand_dims(uApproxRs.data, 1) * radiusA ff, Lf, LL = self._affineResidualMatricesContraction(radiusb, radiusA) err = np.abs((LL - 2. * np.real(Lf) + ff) / ff) ** .5 self.trainedModel.verbosity = verb vbMng(self.trainedModel, "DEL", "Done evaluating error estimator", 10) return err def getMaxErrorEstimator(self, mus:paramList) -> Tuple[Np1D, int, float]: """ Compute maximum of (and index of maximum of) error estimator over given parameters. """ errorEstTest = self.errorEstimator(mus) idxMaxEst = [np.argmax(errorEstTest)] return errorEstTest, idxMaxEst, errorEstTest[idxMaxEst] def _isLastSampleCollinear(self) -> bool: """Check collinearity of last sample.""" if self.collinearityTol <= 0.: return False if self.POD: reff = self.samplingEngine.RPOD[:, -1] else: RROMPyWarning(("Repeated orthogonalization of the samples for " "collinearity check. Consider setting POD to " "True.")) if not hasattr(self, "_PODEngine"): from rrompy.sampling.base.pod_engine import PODEngine self._PODEngine = PODEngine(self.HFEngine) reff = self._PODEngine.generalizedQR(self.samplingEngine.samples, only_R = True, is_state = True)[:, -1] cLevel = np.abs(reff[-1]) / np.linalg.norm(reff) vbMng(self, "MAIN", "Collinearity indicator {:.4e}.".format(cLevel), 5) return cLevel < self.collinearityTol def greedyNextSample(self, muidx:int, plotEst : bool = False)\ -> Tuple[Np1D, int, float, paramVal]: """Compute next greedy snapshot of solution map.""" RROMPyAssert(self._mode, message = "Cannot add greedy sample.") mus = copy(self.muTest[muidx]) self.muTest.pop(muidx) for mu in mus: vbMng(self, "MAIN", ("Adding sample point no. {} at {} to training " "set.").format(self.samplingEngine.nsamples + 1, mu), 2) self.mus.append(mu) self._S = len(self.mus) self._approxParameters["S"] = self.S self.samplingEngine.nextSample(mu) if self._isLastSampleCollinear(): RROMPyWarning("Collinearity above tolerance detected.") errorEstTest = np.empty(len(self.muTest)) errorEstTest[:] = np.nan return errorEstTest, [-1], np.nan, np.nan errorEstTest, muidx, maxErrorEst = self.getMaxErrorEstimator( self.muTest) if (plotEst and not np.any(np.isnan(errorEstTest)) and not np.any(np.isinf(errorEstTest))): musre = copy(self.muTest.re.data) from matplotlib import pyplot as plt plt.figure() errCP = copy(errorEstTest) while len(musre) > 0: if self.npar == 1: currIdx = np.arange(len(musre)) else: currIdx = np.where(np.isclose(np.sum( np.abs(musre[:, 1 :] - musre[0, 1 :]), 1), 0.))[0] plt.semilogy(musre[currIdx, 0], errCP[currIdx], 'k', linewidth = 1) musre = np.delete(musre, currIdx, 0) errCP = np.delete(errCP, currIdx) plt.semilogy([self.muTest.re(0, 0), self.muTest.re(-1, 0)], [self.greedyTol] * 2, 'r--') plt.semilogy(self.mus.re(0), 2. * self.greedyTol * np.ones(len(self.mus)), '*m') plt.semilogy(self.muTest.re(muidx, 0), maxErrorEst, 'xr') plt.grid() plt.show() plt.close() return errorEstTest, muidx, maxErrorEst, self.muTest[muidx] def _preliminaryTraining(self): """Initialize starting snapshots of solution map.""" RROMPyAssert(self._mode, message = "Cannot start greedy algorithm.") if self.samplingEngine.nsamples > 0: return self.computeScaleFactor() self.resetSamples() self.mus = self.trainSetGenerator.generatePoints(self.S)[ list(range(self.S))] muTestBase = self.sampler.generatePoints(self.nTestPoints) idxPop = pruneSamples(muTestBase ** self.HFEngine.rescalingExp, self.mus ** self.HFEngine.rescalingExp, 1e-10 * self.scaleFactor[0]) muTestBase.pop(idxPop) muTestBase = muTestBase.sort() muLast = copy(self.mus[-1]) self.mus.pop() if len(self.mus) > 0: vbMng(self, "MAIN", ("Adding first {} sample point{} at {} to training " "set.").format(self.S - 1, "" + "s" * (self.S > 2), self.mus), 2) self.samplingEngine.iterSample(self.mus) self._S = len(self.mus) self._approxParameters["S"] = self.S self.muTest = emptyParameterList() self.muTest.reset((len(muTestBase) + 1, self.mus.shape[1])) self.muTest[: -1] = muTestBase.data self.muTest[-1] = muLast.data def _enrichTestSet(self, nTest:int): """Add extra elements to test set.""" RROMPyAssert(self._mode, message = "Cannot enrich test set.") muTestExtra = self.sampler.generatePoints(2 * nTest) muTotal = copy(self.mus) muTotal.append(self.muTest) idxPop = pruneSamples(muTestExtra ** self.HFEngine.rescalingExp, muTotal ** self.HFEngine.rescalingExp, 1e-10 * self.scaleFactor[0]) muTestExtra.pop(idxPop) muTestNew = np.empty((len(self.muTest) + len(muTestExtra), self.muTest.shape[1]), dtype = np.complex) muTestNew[: len(self.muTest)] = self.muTest.data muTestNew[len(self.muTest) :] = muTestExtra.data self.muTest = checkParameterList(muTestNew, self.npar)[0].sort() vbMng(self, "MAIN", "Enriching test set by {} elements.".format(len(muTestExtra)), 5) def greedy(self, plotEst : bool = False): """Compute greedy snapshots of solution map.""" RROMPyAssert(self._mode, message = "Cannot start greedy algorithm.") if self.samplingEngine.nsamples > 0: return vbMng(self, "INIT", "Starting computation of snapshots.", 2) self._preliminaryTraining() nTest = self.nTestPoints muT0 = copy(self.muTest[-1]) errorEstTest, muidx, maxErrorEst, mu = self.greedyNextSample( [len(self.muTest) - 1], plotEst) if np.any(np.isnan(maxErrorEst)): RROMPyWarning(("Instability in a posteriori estimator. " "Starting preemptive greedy loop termination.")) self.muTest.append(muT0) self.mus.pop(-1) self.samplingEngine.popSample() self.setupApprox() else: vbMng(self, "MAIN", ("Uniform testing error estimate " "{:.4e}.").format(np.max(maxErrorEst)), 2) trainedModelOld = copy(self.trainedModel) while (self.samplingEngine.nsamples < self.maxIter and np.max(maxErrorEst) > self.greedyTol): if (1. - self.refinementRatio) * nTest > len(self.muTest): self._enrichTestSet(nTest) nTest = len(self.muTest) muTestOld, maxErrorEstOld = self.muTest, np.max(maxErrorEst) errorEstTest, muidx, maxErrorEst, mu = self.greedyNextSample( muidx, plotEst) vbMng(self, "MAIN", ("Uniform testing error estimate " "{:.4e}.").format(np.max(maxErrorEst)), 2) if (np.any(np.isnan(maxErrorEst)) or np.any(np.isinf(maxErrorEst)) or maxErrorEstOld < (np.max(maxErrorEst) * self.TOL_INSTABILITY)): RROMPyWarning(("Instability in a posteriori estimator. " "Starting preemptive greedy loop " "termination.")) self.muTest = muTestOld self._approxParameters = ( trainedModelOld.data.approxParameters) self._S -= 1 self.mus.pop(-1) self.samplingEngine.popSample() self.trainedModel.data = copy(trainedModelOld.data) break trainedModelOld.data = copy(self.trainedModel.data) if (self.interactive and np.max(maxErrorEst) <= self.greedyTol): vbMng(self, "MAIN", ("Required tolerance {} achieved. Want to decrease " "greedyTol and continue? " "Y/N").format(self.greedyTol), 0, end = "") increasemaxIter = input() if increasemaxIter.upper() == "Y": vbMng(self, "MAIN", "Reducing value of greedyTol...", 0) while np.max(maxErrorEst) <= self._greedyTol: self._greedyTol *= .5 if (self.interactive and self.samplingEngine.nsamples >= self.maxIter): vbMng(self, "MAIN", ("Maximum number of iterations {} reached. Want to " "increase maxIter and continue? " "Y/N").format(self.maxIter), 0, end = "") increasemaxIter = input() if increasemaxIter.upper() == "Y": vbMng(self, "MAIN", "Doubling value of maxIter...", 0) self._maxIter *= 2 vbMng(self, "DEL", ("Done computing snapshots (final snapshot count: " "{}).").format(self.samplingEngine.nsamples), 2) def checkComputedApprox(self) -> bool: """ Check if setup of new approximant is not needed. Returns: True if new setup is not needed. False otherwise. """ return (super().checkComputedApprox() and len(self.mus) == self.trainedModel.data.projMat.shape[1]) def assembleReducedResidualGramian(self, pMat:sampList): """ Build residual gramian of reduced linear system through projections. """ self.initEstimatorNormEngine() if (not hasattr(self.trainedModel.data, "gramian") or self.trainedModel.data.gramian is None): gramian = self.estimatorNormEngine.innerProduct(pMat, pMat) else: Sold = self.trainedModel.data.gramian.shape[0] S = len(self.mus) if Sold > S: gramian = self.trainedModel.data.gramian[: S, : S] else: idxOld = list(range(Sold)) idxNew = list(range(Sold, S)) gramian = np.empty((S, S), dtype = np.complex) gramian[: Sold, : Sold] = self.trainedModel.data.gramian gramian[: Sold, Sold :] = ( self.estimatorNormEngine.innerProduct(pMat(idxNew), pMat(idxOld))) gramian[Sold :, : Sold] = gramian[: Sold, Sold :].T.conj() gramian[Sold :, Sold :] = ( self.estimatorNormEngine.innerProduct(pMat(idxNew), pMat(idxNew))) self.trainedModel.data.gramian = gramian def assembleReducedResidualBlocksbb(self, bs:List[Np1D]): """ Build blocks (of type bb) of reduced linear system through projections. """ self.initEstimatorNormEngine() nbs = len(bs) if (not hasattr(self.trainedModel.data, "resbb") or self.trainedModel.data.resbb is None): resbb = np.empty((nbs, nbs), dtype = np.complex) for i in range(nbs): Mbi = bs[i] resbb[i, i] = self.estimatorNormEngine.innerProduct(Mbi, Mbi) for j in range(i): Mbj = bs[j] resbb[i, j] = self.estimatorNormEngine.innerProduct(Mbj, Mbi) for i in range(nbs): for j in range(i + 1, nbs): resbb[i, j] = resbb[j, i].conj() self.trainedModel.data.resbb = resbb def assembleReducedResidualBlocksAb(self, As:List[Np2D], bs:List[Np1D], pMat:sampList): """ Build blocks (of type Ab) of reduced linear system through projections. """ self.initEstimatorNormEngine() nAs = len(As) nbs = len(bs) S = len(self.mus) if (not hasattr(self.trainedModel.data, "resAb") or self.trainedModel.data.resAb is None): if not isinstance(pMat, (np.ndarray,)): pMat = pMat.data resAb = np.empty((nbs, S, nAs), dtype = np.complex) for j in range(nAs): MAj = dot(As[j], pMat) for i in range(nbs): Mbi = bs[i] resAb[i, :, j] = self.estimatorNormEngine.innerProduct(MAj, Mbi) else: Sold = self.trainedModel.data.resAb.shape[1] if Sold == S: return if Sold > S: resAb = self.trainedModel.data.resAb[:, : S, :] else: if not isinstance(pMat, (np.ndarray,)): pMat = pMat.data resAb = np.empty((nbs, S, nAs), dtype = np.complex) resAb[:, : Sold, :] = self.trainedModel.data.resAb for j in range(nAs): MAj = dot(As[j], pMat[:, Sold :]) for i in range(nbs): Mbi = bs[i] resAb[i, Sold :, j] = ( self.estimatorNormEngine.innerProduct(MAj, Mbi)) self.trainedModel.data.resAb = resAb def assembleReducedResidualBlocksAA(self, As:List[Np2D], pMat:sampList): """ Build blocks (of type AA) of reduced linear system through projections. """ self.initEstimatorNormEngine() nAs = len(As) S = len(self.mus) if (not hasattr(self.trainedModel.data, "resAA") or self.trainedModel.data.resAA is None): if not isinstance(pMat, (np.ndarray,)): pMat = pMat.data resAA = np.empty((S, nAs, S, nAs), dtype = np.complex) for i in range(nAs): MAi = dot(As[i], pMat) resAA[:, i, :, i] = ( self.estimatorNormEngine.innerProduct(MAi, MAi)) for j in range(i): MAj = dot(As[j], pMat) resAA[:, i, :, j] = ( self.estimatorNormEngine.innerProduct(MAj, MAi)) for i in range(nAs): for j in range(i + 1, nAs): resAA[:, i, :, j] = resAA[:, j, :, i].T.conj() else: Sold = self.trainedModel.data.resAA.shape[0] if Sold == S: return if Sold > S: resAA = self.trainedModel.data.resAA[: S, :, : S, :] else: if not isinstance(pMat, (np.ndarray,)): pMat = pMat.data resAA = np.empty((S, nAs, S, nAs), dtype = np.complex) resAA[: Sold, :, : Sold, :] = self.trainedModel.data.resAA for i in range(nAs): MAi = dot(As[i], pMat) resAA[: Sold, i, Sold :, i] = ( self.estimatorNormEngine.innerProduct(MAi[:, Sold :], MAi[:, : Sold])) resAA[Sold :, i, : Sold, i] = resAA[: Sold, i, Sold :, i].T.conj() resAA[Sold :, i, Sold :, i] = ( self.estimatorNormEngine.innerProduct(MAi[:, Sold :], MAi[:, Sold :])) for j in range(i): MAj = dot(As[j], pMat) resAA[: Sold, i, Sold :, j] = ( self.estimatorNormEngine.innerProduct(MAj[:, Sold :], MAi[:, : Sold])) resAA[Sold :, i, : Sold, j] = ( self.estimatorNormEngine.innerProduct(MAj[:, : Sold], MAi[:, Sold :])) resAA[Sold :, i, Sold :, j] = ( self.estimatorNormEngine.innerProduct(MAj[:, Sold :], MAi[:, Sold :])) for i in range(nAs): for j in range(i + 1, nAs): resAA[: Sold, i, Sold :, j] = ( resAA[Sold :, j, : Sold, i].T.conj()) resAA[Sold :, i, : Sold, j] = ( resAA[: Sold, j, Sold :, i].T.conj()) resAA[Sold :, i, Sold :, j] = ( resAA[Sold :, j, Sold :, i].T.conj()) self.trainedModel.data.resAA = resAA def assembleReducedResidualBlocks(self, full : bool = False): """Build affine blocks of affine decomposition of residual.""" + self.HFEngine.buildb() self.assembleReducedResidualBlocksbb(self.HFEngine.bs) if full: pMat = self.samplingEngine.samples + self.HFEngine.buildA() self.assembleReducedResidualBlocksAb(self.HFEngine.As, self.HFEngine.bs, pMat) self.assembleReducedResidualBlocksAA(self.HFEngine.As, pMat) diff --git a/rrompy/reduction_methods/greedy/rational_interpolant_greedy.py b/rrompy/reduction_methods/greedy/rational_interpolant_greedy.py index 7f66dab..956d3b7 100644 --- a/rrompy/reduction_methods/greedy/rational_interpolant_greedy.py +++ b/rrompy/reduction_methods/greedy/rational_interpolant_greedy.py @@ -1,527 +1,530 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from copy import deepcopy as copy import numpy as np from .generic_greedy_approximant import (GenericGreedyApproximant, localL2Distance as lL2D) from rrompy.utilities.poly_fitting.polynomial import (polybases, polydomcoeff, PolynomialInterpolator as PI, polyvanderTotal as pvT) from rrompy.utilities.numerical import totalDegreeN, dot from rrompy.utilities.expression import expressionEvaluator from rrompy.reduction_methods.standard import RationalInterpolant from rrompy.utilities.base.types import (Np1D, Tuple, DictAny, HFEng, paramVal, paramList) from rrompy.utilities.base import verbosityManager as vbMng from rrompy.utilities.poly_fitting import customFit from rrompy.utilities.exception_manager import (RROMPyWarning, RROMPyException, RROMPyAssert, RROMPy_FRAGILE) from rrompy.parameter import checkParameterList __all__ = ['RationalInterpolantGreedy'] class RationalInterpolantGreedy(GenericGreedyApproximant, RationalInterpolant): """ ROM greedy rational interpolant computation for parametric problems. Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': whether to compute POD of snapshots; defaults to True; - 'S': number of starting training points; - 'sampler': sample point generator; - 'greedyTol': uniform error tolerance for greedy algorithm; defaults to 1e-2; - 'collinearityTol': collinearity tolerance for greedy algorithm; defaults to 0.; - 'interactive': whether to interactively terminate greedy algorithm; defaults to False; - 'maxIter': maximum number of greedy steps; defaults to 1e2; - 'refinementRatio': ratio of test points to be exhausted before test set refinement; defaults to 0.2; - 'nTestPoints': number of test points; defaults to 5e2; - 'trainSetGenerator': training sample points generator; defaults to sampler; - 'polybasis': type of basis for interpolation; defaults to 'MONOMIAL'; - 'errorEstimatorKind': kind of error estimator; available values include 'AFFINE', 'DISCREPANCY', 'INTERPOLATORY', 'EIM_INTERPOLATORY', and 'EIM_DIAGONAL'; defaults to 'AFFINE'; - 'interpRcond': tolerance for interpolation; defaults to None; - 'robustTol': tolerance for robust rational denominator management; defaults to 0. Defaults to empty dict. approx_state(optional): Whether to approximate state. Defaults and must be True. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. mu0: Default parameter. mus: Array of snapshot parameters. approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList. parameterListSoft: Recognized keys of soft approximant parameters: - 'POD': whether to compute POD of snapshots. - 'greedyTol': uniform error tolerance for greedy algorithm; - 'collinearityTol': collinearity tolerance for greedy algorithm; - 'interactive': whether to interactively terminate greedy algorithm; - 'maxIter': maximum number of greedy steps; - 'refinementRatio': ratio of test points to be exhausted before test set refinement; - 'nTestPoints': number of test points; - 'trainSetGenerator': training sample points generator; - 'errorEstimatorKind': kind of error estimator; - 'interpRcond': tolerance for interpolation; - 'robustTol': tolerance for robust rational denominator management. parameterListCritical: Recognized keys of critical approximant parameters: - 'S': total number of samples current approximant relies upon; - 'sampler': sample point generator. approx_state: Whether to approximate state. verbosity: Verbosity level. POD: whether to compute POD of snapshots. S: number of test points. sampler: Sample point generator. greedyTol: uniform error tolerance for greedy algorithm. collinearityTol: Collinearity tolerance for greedy algorithm. interactive: whether to interactively terminate greedy algorithm. maxIter: maximum number of greedy steps. refinementRatio: ratio of training points to be exhausted before training set refinement. nTestPoints: number of starting training points. trainSetGenerator: training sample points generator. robustTol: tolerance for robust rational denominator management. errorEstimatorKind: kind of error estimator. interpRcond: tolerance for interpolation. robustTol: tolerance for robust rational denominator management. muBounds: list of bounds for parameter values. samplingEngine: Sampling engine. estimatorNormEngine: Engine for estimator norm computation. uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as sampleList. lastSolvedHF: Parameter(s) corresponding to last computed high fidelity solution(s) as parameterList. uApproxReduced: Reduced approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApproxReduced: Parameter(s) corresponding to last computed reduced approximate solution(s) as parameterList. uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApprox: Parameter(s) corresponding to last computed approximate solution(s) as parameterList. """ _allowedEstimatorKinds = ["AFFINE", "DISCREPANCY", "INTERPOLATORY", "EIM_INTERPOLATORY", "EIM_DIAGONAL"] def __init__(self, HFEngine:HFEng, mu0 : paramVal = None, approxParameters : DictAny = {}, approx_state : bool = True, verbosity : int = 10, timestamp : bool = True): if not approx_state: RROMPyWarning("Overriding approx_state to True.") self._preInit() self._addParametersToList(["errorEstimatorKind"], ["AFFINE"], toBeExcluded = ["M", "N", "polydegreetype", "radialDirectionalWeights", "nNearestNeighbor"]) super().__init__(HFEngine = HFEngine, mu0 = mu0, approxParameters = approxParameters, approx_state = True, verbosity = verbosity, timestamp = timestamp) self._postInit() @property def E(self): """Value of E.""" self._E = self.sampleBatchIdx - 1 return self._E @E.setter def E(self, E): RROMPyWarning(("E is used just to simplify inheritance, and its value " "cannot be changed from that of sampleBatchIdx - 1.")) @property def polydegreetype(self): """Value of polydegreetype.""" return "TOTAL" @polydegreetype.setter def polydegreetype(self, polydegreetype): RROMPyWarning(("polydegreetype is used just to simplify inheritance, " "and its value cannot be changed from 'TOTAL'.")) @property def polybasis(self): """Value of polybasis.""" return self._polybasis @polybasis.setter def polybasis(self, polybasis): try: polybasis = polybasis.upper().strip().replace(" ","") if polybasis not in polybases: raise RROMPyException("Sample type not recognized.") self._polybasis = polybasis except: RROMPyWarning(("Prescribed polybasis not recognized. Overriding " "to 'MONOMIAL'.")) self._polybasis = "MONOMIAL" self._approxParameters["polybasis"] = self.polybasis @property def errorEstimatorKind(self): """Value of errorEstimatorKind.""" return self._errorEstimatorKind @errorEstimatorKind.setter def errorEstimatorKind(self, errorEstimatorKind): errorEstimatorKind = errorEstimatorKind.upper() if errorEstimatorKind not in self._allowedEstimatorKinds: RROMPyWarning(("Error estimator kind not recognized. Overriding " "to 'AFFINE'.")) errorEstimatorKind = "AFFINE" self._errorEstimatorKind = errorEstimatorKind self._approxParameters["errorEstimatorKind"] = self.errorEstimatorKind def errorEstimator(self, mus:Np1D) -> Np1D: """Standard residual-based error estimator.""" if self.errorEstimatorKind == "AFFINE": return super().errorEstimator(mus) setupOK = self.setupApprox() if not setupOK: err = np.empty(len(mus)) err[:] = np.nan return err if self.errorEstimatorKind == "DIAGONAL": return self.errorEstimatorEIM(mus) self._setupInterpolationIndices() mus = checkParameterList(mus, self.npar)[0] muCTest = self.trainedModel.centerNormalize(mus) vbMng(self.trainedModel, "INIT", "Evaluating error estimator at mu = {}.".format(mus), 10) verb = self.trainedModel.verbosity self.trainedModel.verbosity = 0 QTest = self.trainedModel.getQVal(mus) if self.errorEstimatorKind == "DISCREPANCY": - nAs, nbs = len(self.HFEngine.thAs), len(self.HFEngine.thbs) + self.HFEngine.buildA() + self.HFEngine.buildb() + nAs, nbs = self.HFEngine.nAs, self.HFEngine.nbs muTrainEff = self.mus ** self.HFEngine.rescalingExp muTestEff = mus ** self.HFEngine.rescalingExp PTrain = self.trainedModel.getPVal(self.mus).data.T QTrain = self.trainedModel.getQVal(self.mus) PTest = self.trainedModel.getPVal(mus).data radiusAbTrain = np.empty((self.S, nAs * self.S + nbs), dtype = np.complex) radiusA = np.empty((self.S, nAs, len(mus)), dtype = np.complex) radiusb = np.empty((nbs, len(mus)), dtype = np.complex) for j, thA in enumerate(self.HFEngine.thAs): idxs = j * self.S + np.arange(self.S) radiusAbTrain[:, idxs] = expressionEvaluator(thA[0], muTrainEff, (self.S, 1)) * PTrain radiusA[:, j] = PTest * expressionEvaluator(thA[0], muTestEff, (len(mus),)) for j, thb in enumerate(self.HFEngine.thbs): idx = nAs * self.S + j radiusAbTrain[:, idx] = QTrain * expressionEvaluator(thb[0], muTrainEff, (self.S,)) radiusb[j] = QTest * expressionEvaluator(thb[0], muTestEff, (len(mus),)) QRHSNorm2 = self._affineResidualMatricesContraction(radiusb) vanTrain, _, vanTrainIdxs = pvT(self._musUniqueCN, self.E, self.polybasis0, self._derIdxs, self._reorder) interpPQ = customFit(vanTrain[:, vanTrainIdxs], radiusAbTrain, rcond = self.interpRcond) vanTest, _, vanTestIdxs = pvT(muCTest, self.E, self.polybasis0) DradiusAb = vanTest[:, vanTestIdxs].dot(interpPQ) radiusA = (radiusA - DradiusAb[:, : - nbs].reshape(len(mus), -1, self.S).T) radiusb = radiusb - DradiusAb[:, - nbs :].T ff, Lf, LL = self._affineResidualMatricesContraction(radiusb, radiusA) err = np.abs((LL - 2. * np.real(Lf) + ff) / QRHSNorm2) ** .5 else: #if self.errorEstimatorKind == "INTERPOLATORY": muCTrain = self.trainedModel.centerNormalize(self.mus) samplingRatio = np.prod(lL2D(muCTest.data, muCTrain.data), axis = 1) / np.abs(QTest) self.initEstimatorNormEngine() QTest = np.abs(QTest) sampRCP = copy(samplingRatio) idx_muTestSample = np.empty(self.sampleBatchSize, dtype = int) for j in range(self.sampleBatchSize): k = np.argmax(sampRCP) idx_muTestSample[j] = k if j + 1 < self.sampleBatchSize: musZero = self.trainedModel.centerNormalize(mus, mus[k]) sampRCP *= np.linalg.norm(musZero.data, axis = 1) mu_muTestSample = mus[idx_muTestSample] app_muTestSample = self.getApproxReduced(mu_muTestSample) if self._mode == RROMPy_FRAGILE: if not self.HFEngine.isCEye: raise RROMPyException(("Cannot compute INTERPOLATORY " "residual estimator in fragile " "mode for non-scalar C.")) app_muTestSample = dot(self.trainedModel.data.projMat, app_muTestSample.data) else: app_muTestSample = dot(self.samplingEngine.samples, app_muTestSample) resmu = self.HFEngine.residual(mu_muTestSample, app_muTestSample, post_c = False) RHSmu = self.HFEngine.residual(mu_muTestSample, None, post_c = False) ressamples = (self.estimatorNormEngine.norm(resmu) / self.estimatorNormEngine.norm(RHSmu)) musT = copy(self.mus) musT.append(mu_muTestSample) musT = self.trainedModel.centerNormalize(musT) musC = self.trainedModel.centerNormalize(mus) resT = np.zeros(len(musT), dtype = np.complex) err = np.zeros(len(mus)) for l in range(len(mu_muTestSample)): resT[len(self.mus) + l] = (ressamples[l] * QTest[idx_muTestSample[l]]) p = PI() wellCond, msg = p.setupByInterpolation(musT, resT, self.E + 1, self.polybasis, self.verbosity >= 15, True, {}, {"rcond": self.interpRcond}) err += np.abs(p(musC)) resT[len(self.mus) + l] = 0. err /= QTest vbMng(self, "MAIN", msg, 15) self.trainedModel.verbosity = verb vbMng(self.trainedModel, "DEL", "Done evaluating error estimator", 10) return err def errorEstimatorEIM(self, mus:Np1D, return_max_idxs : bool = False) -> Np1D: """EIM-like interpolation error estimator.""" setupOK = self.setupApprox() if not setupOK: err = np.empty(len(mus)) err[:] = np.nan return err self._setupInterpolationIndices() mus = checkParameterList(mus, self.npar)[0] vbMng(self.trainedModel, "INIT", "Evaluating error estimator at mu = {}.".format(mus), 10) verb = self.trainedModel.verbosity self.trainedModel.verbosity = 0 QTest = self.trainedModel.getQVal(mus) muCTest = self.trainedModel.centerNormalize(mus) muCTrain = self.trainedModel.centerNormalize(self.mus) vanderTest, _, vanderTestR = pvT(muCTest, self.E, self.polybasis) vanderTest = vanderTest[:, vanderTestR] vanderTestNext, _, vanderTestNextR = pvT(muCTest, self.E + 1, self.polybasis) vanderTestNext = vanderTestNext[:, vanderTestNextR[ vanderTest.shape[1] :]] idxsTest = np.arange(vanderTestNext.shape[1]) basis = np.zeros((len(idxsTest), 0), dtype = float) idxMaxEst = [] err = None while len(idxsTest) > 0: vanderTrial, _, vanderTrialR = pvT(muCTrain, self.E, self.polybasis) vanderTrial = vanderTrial[:, vanderTrialR] vanderTrialNext, _, vanderTrialNextR = pvT(muCTrain, self.E + 1, self.polybasis) vanderTrialNext = vanderTrialNext[:, vanderTrialNextR[ vanderTrial.shape[1] :]] vanderTrial = np.hstack((vanderTrial, vanderTrialNext.dot(basis).reshape( len(vanderTrialNext), basis.shape[1]))) valuesTrial = vanderTrialNext[:, idxsTest] vanderTestEff = np.hstack((vanderTest, vanderTestNext.dot(basis).reshape( len(vanderTestNext), basis.shape[1]))) vanderTestNextEff = vanderTestNext[:, idxsTest] coeffTest = np.linalg.solve(vanderTrial, valuesTrial) errTest = np.abs((vanderTestNextEff - vanderTestEff.dot(coeffTest)) / np.expand_dims(QTest, 1)) idxMaxErr = np.unravel_index(np.argmax(errTest), errTest.shape) idxMaxEst += [idxMaxErr[0]] if err is None: err = np.max(errTest, axis = 1) if not return_max_idxs: break muCTrain.append(muCTest[idxMaxErr[0]]) basis = np.pad(basis, [(0, 0), (0, 1)], "constant") basis[idxsTest[idxMaxErr[1]], -1] = 1. idxsTest = np.delete(idxsTest, idxMaxErr[1]) if self.errorEstimatorKind == "EIM_DIAGONAL": + self.HFEngine.buildb() self.assembleReducedResidualBlocks(full = False) muTestEff = mus ** self.HFEngine.rescalingExp radiusb = np.empty((len(self.HFEngine.thbs), len(mus)), dtype = np.complex) for j, thb in enumerate(self.HFEngine.thbs): radiusb[j] = expressionEvaluator(thb[0], muTestEff) bresb = self._affineResidualMatricesContraction(radiusb) self.assembleReducedResidualGramian(self.trainedModel.data.projMat) pDom = (polydomcoeff(self.E, self.polybasis) * self.trainedModel.data.P[(-1,) + (0,) * (self.npar - 1)]) LL = pDom.conj().dot(self.trainedModel.data.gramian.dot(pDom)) if not hasattr(self, "Anorm2Approx"): if self.HFEngine.nAs > 1: Ader = self.HFEngine.A(self.mu0, [1] + [0] * (self.npar - 1)) try: Adiag = self.scaleFactor[0] * Ader.diagonal() except: Adiag = self.scaleFactor[0] * np.diagonal(Ader) self.Anorm2Approx = np.mean(np.abs(Adiag) ** 2.) if (np.isclose(self.Anorm2Approx, 0.) or self.HFEngine.nAs <= 1): self.Anorm2Approx = 1 jOpt = np.abs(self.Anorm2Approx * LL / bresb) ** .5 err = jOpt * err else: #if self.errorEstimatorKind == "EIM_INTERPOLATORY": self.initEstimatorNormEngine() mu_muTestSample = mus[idxMaxEst[0]] app_muTestSample = self.getApproxReduced(mu_muTestSample) if self._mode == RROMPy_FRAGILE: if not self.HFEngine.isCEye: raise RROMPyException(("Cannot compute EIM_INTERPOLATORY " "residual estimator in fragile " "mode for non-scalar C.")) app_muTestSample = dot(self.trainedModel.data.projMat, app_muTestSample.data) else: app_muTestSample = dot(self.samplingEngine.samples, app_muTestSample) resmu = self.HFEngine.residual(mu_muTestSample, app_muTestSample, post_c = False) RHSmu = self.HFEngine.residual(mu_muTestSample, None, post_c = False) jOpt = np.abs(self.estimatorNormEngine.norm(resmu)[0] / err[idxMaxEst[0]] / self.estimatorNormEngine.norm(RHSmu)[0]) err = jOpt * err self.trainedModel.verbosity = verb vbMng(self.trainedModel, "DEL", "Done evaluating error estimator", 10) if return_max_idxs: return err, idxMaxEst return err def getMaxErrorEstimator(self, mus:paramList) -> Tuple[Np1D, int, float]: """ Compute maximum of (and index of maximum of) error estimator over given parameters. """ if self.errorEstimatorKind[: 4] == "EIM_": errorEstTest, idxMaxEst = self.errorEstimatorEIM(mus, True) else: errorEstTest = self.errorEstimator(mus) idxMaxEst = np.empty(self.sampleBatchSize, dtype = int) errCP = copy(errorEstTest) for j in range(self.sampleBatchSize): k = np.argmax(errCP) idxMaxEst[j] = k if j + 1 < self.sampleBatchSize: musZero = self.trainedModel.centerNormalize(mus, mus[k]) errCP *= np.linalg.norm(musZero.data, axis = 1) maxEst = errorEstTest[idxMaxEst] return errorEstTest, idxMaxEst, maxEst def greedyNextSample(self, muidx:int, plotEst : bool = False)\ -> Tuple[Np1D, int, float, paramVal]: """Compute next greedy snapshot of solution map.""" RROMPyAssert(self._mode, message = "Cannot add greedy sample.") self.sampleBatchIdx += 1 self.sampleBatchSize = totalDegreeN(self.npar - 1, self.sampleBatchIdx) return super().greedyNextSample(muidx, plotEst) def _preliminaryTraining(self): """Initialize starting snapshots of solution map.""" RROMPyAssert(self._mode, message = "Cannot start greedy algorithm.") if self.samplingEngine.nsamples > 0: return S = self.S self.sampleBatchIdx, self.sampleBatchSize, self._S = -1, 0, 0 nextBatchSize = 1 while self._S + nextBatchSize <= S: self.sampleBatchIdx += 1 self.sampleBatchSize = nextBatchSize self._S += self.sampleBatchSize nextBatchSize = totalDegreeN(self.npar - 1, self.sampleBatchIdx + 1) super()._preliminaryTraining() def setupApprox(self, plotEst : bool = False): """ Compute rational interpolant. SVD-based robust eigenvalue management. """ if self.checkComputedApprox(): return True RROMPyAssert(self._mode, message = "Cannot setup approximant.") vbMng(self, "INIT", "Setting up {}.".format(self.name()), 5) self.greedy(plotEst) self._N, self._M = [self.E] * 2 pMat = self.samplingEngine.samples.data pMatEff = dot(self.HFEngine.C, pMat) if self.trainedModel is None: self.trainedModel = self.tModelType() self.trainedModel.verbosity = self.verbosity self.trainedModel.timestamp = self.timestamp datadict = {"mu0": self.mu0, "projMat": pMatEff, "scaleFactor": self.scaleFactor, "rescalingExp": self.HFEngine.rescalingExp} self.trainedModel.data = self.initializeModelData(datadict)[0] else: self.trainedModel = self.trainedModel self.trainedModel.data.projMat = copy(pMatEff) self.trainedModel.data.mus = copy(self.mus) self.trainedModel.data.mus = copy(self.mus) self.catchInstability = True if self.N > 0: try: Q = self._setupDenominator()[0] except RROMPyException as RE: RROMPyWarning(RE._msg) vbMng(self, "DEL", "Done setting up approximant.", 5) return False else: Q = PI() Q.coeffs = np.ones(1, dtype = np.complex) Q.npar = 1 Q.polybasis = self.polybasis self.trainedModel.data.Q = copy(Q) try: self.trainedModel.data.P = copy(self._setupNumerator()) except RROMPyException as RE: RROMPyWarning(RE._msg) vbMng(self, "DEL", "Done setting up approximant.", 5) return False self.trainedModel.data.approxParameters = copy(self.approxParameters) vbMng(self, "DEL", "Done setting up approximant.", 5) return True def loadTrainedModel(self, filename:str): """Load trained reduced model from file.""" super().loadTrainedModel(filename) self.sampleBatchIdx, self.sampleBatchSize, _S = -1, 0, 0 nextBatchSize = 1 while _S + nextBatchSize <= self.S + 1: self.sampleBatchIdx += 1 self.sampleBatchSize = nextBatchSize _S += self.sampleBatchSize nextBatchSize = totalDegreeN(self.npar - 1, self.sampleBatchIdx + 1) diff --git a/rrompy/reduction_methods/greedy/reduced_basis_greedy.py b/rrompy/reduction_methods/greedy/reduced_basis_greedy.py index 82b22ff..3990b62 100644 --- a/rrompy/reduction_methods/greedy/reduced_basis_greedy.py +++ b/rrompy/reduction_methods/greedy/reduced_basis_greedy.py @@ -1,183 +1,185 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from copy import deepcopy as copy from .generic_greedy_approximant import GenericGreedyApproximant from rrompy.reduction_methods.standard import ReducedBasis from rrompy.utilities.base.types import DictAny, HFEng, paramVal from rrompy.utilities.base import verbosityManager as vbMng from rrompy.utilities.numerical import dot from rrompy.utilities.exception_manager import RROMPyWarning, RROMPyAssert __all__ = ['ReducedBasisGreedy'] class ReducedBasisGreedy(GenericGreedyApproximant, ReducedBasis): """ ROM greedy RB approximant computation for parametric problems. Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': whether to compute POD of snapshots; defaults to True; - 'S': number of starting training points; - 'sampler': sample point generator; - 'greedyTol': uniform error tolerance for greedy algorithm; defaults to 1e-2; - 'collinearityTol': collinearity tolerance for greedy algorithm; defaults to 0.; - 'interactive': whether to interactively terminate greedy algorithm; defaults to False; - 'maxIter': maximum number of greedy steps; defaults to 1e2; - 'refinementRatio': ratio of test points to be exhausted before test set refinement; defaults to 0.2; - 'nTestPoints': number of test points; defaults to 5e2; - 'trainSetGenerator': training sample points generator; defaults to sampler. Defaults to empty dict. approx_state(optional): Whether to approximate state. Defaults and must be True. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. mu0: Default parameter. mus: Array of snapshot parameters. approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList. parameterListSoft: Recognized keys of soft approximant parameters: - 'POD': whether to compute POD of snapshots. - 'greedyTol': uniform error tolerance for greedy algorithm; - 'collinearityTol': collinearity tolerance for greedy algorithm; - 'interactive': whether to interactively terminate greedy algorithm; - 'maxIter': maximum number of greedy steps; - 'refinementRatio': ratio of test points to be exhausted before test set refinement; - 'nTestPoints': number of test points; - 'trainSetGenerator': training sample points generator. parameterListCritical: Recognized keys of critical approximant parameters: - 'S': total number of samples current approximant relies upon; - 'sampler': sample point generator. approx_state: Whether to approximate state. verbosity: Verbosity level. POD: whether to compute POD of snapshots. S: number of test points. sampler: Sample point generator. greedyTol: uniform error tolerance for greedy algorithm. collinearityTol: Collinearity tolerance for greedy algorithm. interactive: whether to interactively terminate greedy algorithm. maxIter: maximum number of greedy steps. refinementRatio: ratio of training points to be exhausted before training set refinement. nTestPoints: number of starting training points. trainSetGenerator: training sample points generator. muBounds: list of bounds for parameter values. samplingEngine: Sampling engine. estimatorNormEngine: Engine for estimator norm computation. uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as sampleList. lastSolvedHF: Parameter(s) corresponding to last computed high fidelity solution(s) as parameterList. uApproxReduced: Reduced approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApproxReduced: Parameter(s) corresponding to last computed reduced approximate solution(s) as parameterList. uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApprox: Parameter(s) corresponding to last computed approximate solution(s) as parameterList. As: List of sparse matrices (in CSC format) representing coefficients of linear system matrix. bs: List of numpy vectors representing coefficients of linear system RHS. ARBs: List of sparse matrices (in CSC format) representing coefficients of compressed linear system matrix. bRBs: List of numpy vectors representing coefficients of compressed linear system RHS. """ def __init__(self, HFEngine:HFEng, mu0 : paramVal = None, approxParameters : DictAny = {}, approx_state : bool = True, verbosity : int = 10, timestamp : bool = True): if not approx_state: RROMPyWarning("Overriding approx_state to True.") self._preInit() self._addParametersToList([], [], toBeExcluded = ["R", "PODTolerance"]) super().__init__(HFEngine = HFEngine, mu0 = mu0, approxParameters = approxParameters, approx_state = True, verbosity = verbosity, timestamp = timestamp) self._postInit() @property def R(self): """Value of R.""" self._R = self._S return self._R @R.setter def R(self, R): RROMPyWarning(("R is used just to simplify inheritance, and its value " "cannot be changed from that of S.")) @property def PODTolerance(self): """Value of PODTolerance.""" self._PODTolerance = -1 return self._PODTolerance @PODTolerance.setter def PODTolerance(self, PODTolerance): RROMPyWarning(("PODTolerance is used just to simplify inheritance, " "and its value cannot be changed from -1.")) def setupApprox(self, plotEst : bool = False): """Compute RB projection matrix.""" if self.checkComputedApprox(): return RROMPyAssert(self._mode, message = "Cannot setup approximant.") vbMng(self, "INIT", "Setting up {}.". format(self.name()), 5) self.greedy(plotEst) vbMng(self, "INIT", "Computing projection matrix.", 7) pMat = self.samplingEngine.samples.data pMatEff = dot(self.HFEngine.C, pMat) if self.trainedModel is None: self.trainedModel = self.tModelType() self.trainedModel.verbosity = self.verbosity self.trainedModel.timestamp = self.timestamp datadict = {"mu0": self.mu0, "projMat": pMatEff, "scaleFactor": self.scaleFactor, "rescalingExp": self.HFEngine.rescalingExp} data = self.initializeModelData(datadict)[0] ARBs, bRBs = self.assembleReducedSystem(pMat) data.affinePoly = self.HFEngine.affinePoly + self.HFEngine.buildA() + self.HFEngine.buildb() data.thAs, data.thbs = self.HFEngine.thAs, self.HFEngine.thbs self.trainedModel.data = data else: self.trainedModel = self.trainedModel Sold = self.trainedModel.data.projMat.shape[1] ARBs, bRBs = self.assembleReducedSystem(pMat[:, Sold :], pMat[:, : Sold]) self.trainedModel.data.projMat = copy(pMatEff) self.trainedModel.data.mus = copy(self.mus) self.trainedModel.data.ARBs = ARBs self.trainedModel.data.bRBs = bRBs vbMng(self, "DEL", "Done computing projection matrix.", 7) self.trainedModel.data.approxParameters = copy(self.approxParameters) vbMng(self, "DEL", "Done setting up approximant.", 5)