diff --git a/rrompy/reduction_methods/standard/greedy/generic_greedy_approximant.py b/rrompy/reduction_methods/standard/greedy/generic_greedy_approximant.py index c37383b..ccd31d9 100644 --- a/rrompy/reduction_methods/standard/greedy/generic_greedy_approximant.py +++ b/rrompy/reduction_methods/standard/greedy/generic_greedy_approximant.py @@ -1,651 +1,651 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from abc import abstractmethod from copy import deepcopy as copy import numpy as np from matplotlib import pyplot as plt from rrompy.hfengines.base.linear_affine_engine import checkIfAffine from rrompy.reduction_methods.standard.generic_standard_approximant import ( GenericStandardApproximant) from rrompy.utilities.base.types import (Np1D, Np2D, Tuple, List, normEng, paramVal, paramList, sampList) from rrompy.utilities.base import verbosityManager as vbMng from rrompy.utilities.numerical import dot from rrompy.utilities.expression import expressionEvaluator from rrompy.solver import normEngine from rrompy.utilities.exception_manager import (RROMPyException, RROMPyAssert, RROMPyWarning) from rrompy.sampling.sample_list import sampleList from rrompy.parameter import emptyParameterList, parameterList from rrompy.utilities.parallel import masterCore __all__ = ['GenericGreedyApproximant'] def localL2Distance(mus:Np2D, badmus:Np2D) -> Np2D: return np.linalg.norm(np.tile(mus[..., np.newaxis], [1, 1, len(badmus)]) - badmus[..., np.newaxis].T, axis = 1) def pruneSamples(mus:paramList, badmus:paramList, tol : float = 1e-8) -> Np1D: """Remove from mus all the elements which are too close to badmus.""" if isinstance(mus, (parameterList, sampleList)): mus = mus.data if isinstance(badmus, (parameterList, sampleList)): badmus = badmus.data if len(badmus) == 0: return np.arange(len(mus)) proximity = np.min(localL2Distance(mus, badmus), axis = 1) return np.where(proximity <= tol)[0] class GenericGreedyApproximant(GenericStandardApproximant): """ ROM greedy interpolant computation for parametric problems (ABSTRACT). Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': whether to compute POD of snapshots; defaults to True; - 'scaleFactorDer': scaling factors for derivative computation; defaults to 'AUTO'; - 'S': number of starting training points; - 'sampler': sample point generator; - 'greedyTol': uniform error tolerance for greedy algorithm; defaults to 1e-2; - 'collinearityTol': collinearity tolerance for greedy algorithm; defaults to 0.; - 'maxIter': maximum number of greedy steps; defaults to 1e2; - 'nTestPoints': number of test points; defaults to 5e2; - 'trainSetGenerator': training sample points generator; defaults to sampler. Defaults to empty dict. approx_state(optional): Whether to approximate state. Defaults to False. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. mu0: Default parameter. mus: Array of snapshot parameters. approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList. parameterListSoft: Recognized keys of soft approximant parameters: - 'POD': whether to compute POD of snapshots; - 'scaleFactorDer': scaling factors for derivative computation; - 'greedyTol': uniform error tolerance for greedy algorithm; - 'collinearityTol': collinearity tolerance for greedy algorithm; - 'maxIter': maximum number of greedy steps; - 'nTestPoints': number of test points; - 'trainSetGenerator': training sample points generator. parameterListCritical: Recognized keys of critical approximant parameters: - 'S': total number of samples current approximant relies upon; - 'sampler': sample point generator. approx_state: Whether to approximate state. verbosity: Verbosity level. POD: whether to compute POD of snapshots. scaleFactorDer: Scaling factors for derivative computation. S: number of test points. sampler: Sample point generator. greedyTol: Uniform error tolerance for greedy algorithm. collinearityTol: Collinearity tolerance for greedy algorithm. maxIter: maximum number of greedy steps. nTestPoints: number of starting training points. trainSetGenerator: training sample points generator. muBounds: list of bounds for parameter values. samplingEngine: Sampling engine. estimatorNormEngine: Engine for estimator norm computation. uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as sampleList. lastSolvedHF: Parameter(s) corresponding to last computed high fidelity solution(s) as parameterList. uApproxReduced: Reduced approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApproxReduced: Parameter(s) corresponding to last computed reduced approximate solution(s) as parameterList. uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApprox: Parameter(s) corresponding to last computed approximate solution(s) as parameterList. """ def __init__(self, *args, **kwargs): self._preInit() self._addParametersToList(["greedyTol", "collinearityTol", "maxIter", "nTestPoints"], [1e-2, 0., 1e2, 5e2], ["trainSetGenerator"], ["AUTO"]) super().__init__(*args, **kwargs) self._postInit() @property def greedyTol(self): """Value of greedyTol.""" return self._greedyTol @greedyTol.setter def greedyTol(self, greedyTol): if greedyTol < 0: raise RROMPyException("greedyTol must be non-negative.") if hasattr(self, "_greedyTol") and self.greedyTol is not None: greedyTolold = self.greedyTol else: greedyTolold = -1 self._greedyTol = greedyTol self._approxParameters["greedyTol"] = self.greedyTol if greedyTolold != self.greedyTol: self.resetSamples() @property def collinearityTol(self): """Value of collinearityTol.""" return self._collinearityTol @collinearityTol.setter def collinearityTol(self, collinearityTol): if collinearityTol < 0: raise RROMPyException("collinearityTol must be non-negative.") if (hasattr(self, "_collinearityTol") and self.collinearityTol is not None): collinearityTolold = self.collinearityTol else: collinearityTolold = -1 self._collinearityTol = collinearityTol self._approxParameters["collinearityTol"] = self.collinearityTol if collinearityTolold != self.collinearityTol: self.resetSamples() @property def maxIter(self): """Value of maxIter.""" return self._maxIter @maxIter.setter def maxIter(self, maxIter): if maxIter <= 0: raise RROMPyException("maxIter must be positive.") if hasattr(self, "_maxIter") and self.maxIter is not None: maxIterold = self.maxIter else: maxIterold = -1 self._maxIter = maxIter self._approxParameters["maxIter"] = self.maxIter if maxIterold != self.maxIter: self.resetSamples() @property def nTestPoints(self): """Value of nTestPoints.""" return self._nTestPoints @nTestPoints.setter def nTestPoints(self, nTestPoints): if nTestPoints <= 0: raise RROMPyException("nTestPoints must be positive.") if not np.isclose(nTestPoints, np.int(nTestPoints)): raise RROMPyException("nTestPoints must be an integer.") nTestPoints = np.int(nTestPoints) if hasattr(self, "_nTestPoints") and self.nTestPoints is not None: nTestPointsold = self.nTestPoints else: nTestPointsold = -1 self._nTestPoints = nTestPoints self._approxParameters["nTestPoints"] = self.nTestPoints if nTestPointsold != self.nTestPoints: self.resetSamples() @property def trainSetGenerator(self): """Value of trainSetGenerator.""" return self._trainSetGenerator @trainSetGenerator.setter def trainSetGenerator(self, trainSetGenerator): if (isinstance(trainSetGenerator, (str,)) and trainSetGenerator.upper() == "AUTO"): trainSetGenerator = self.sampler if 'generatePoints' not in dir(trainSetGenerator): raise RROMPyException("trainSetGenerator type not recognized.") if (hasattr(self, '_trainSetGenerator') and self.trainSetGenerator not in [None, "AUTO"]): trainSetGeneratorOld = self.trainSetGenerator self._trainSetGenerator = trainSetGenerator self._approxParameters["trainSetGenerator"] = self.trainSetGenerator if (not 'trainSetGeneratorOld' in locals() or trainSetGeneratorOld != self.trainSetGenerator): self.resetSamples() def resetSamples(self): """Reset samples.""" super().resetSamples() self._mus = emptyParameterList() def initEstimatorNormEngine(self, normEngn : normEng = None): """Initialize estimator norm engine.""" if (normEngn is not None or not hasattr(self, "estimatorNormEngine") or self.estimatorNormEngine is None): if normEngn is None: if self.approx_state: if not hasattr(self.HFEngine, "energyNormDualMatrix"): self.HFEngine.buildEnergyNormDualForm() estimatorEnergyMatrix = self.HFEngine.energyNormDualMatrix else: estimatorEnergyMatrix = self.HFEngine.outputNormMatrix else: if hasattr(normEngn, "buildEnergyNormDualForm"): if not hasattr(normEngn, "energyNormDualMatrix"): normEngn.buildEnergyNormDualForm() estimatorEnergyMatrix = normEngn.energyNormDualMatrix else: estimatorEnergyMatrix = normEngn self.estimatorNormEngine = normEngine(estimatorEnergyMatrix) def _affineResidualMatricesContraction(self, rb:Np2D, rA : Np2D = None) \ -> Tuple[Np1D, Np1D, Np1D]: self.assembleReducedResidualBlocks(full = rA is not None) # 'ij,jk,ik->k', resbb, radiusb, radiusb.conj() ff = np.sum(self.trainedModel.data.resbb.dot(rb) * rb.conj(), axis = 0) if rA is None: return ff # 'ijk,jkl,il->l', resAb, radiusA, radiusb.conj() Lf = np.sum(np.tensordot(self.trainedModel.data.resAb, rA, 2) * rb.conj(), axis = 0) # 'ijkl,klt,ijt->t', resAA, radiusA, radiusA.conj() LL = np.sum(np.tensordot(self.trainedModel.data.resAA, rA, 2) * rA.conj(), axis = (0, 1)) return ff, Lf, LL def getErrorEstimatorAffine(self, mus:Np1D) -> Np1D: """Standard residual estimator.""" checkIfAffine(self.HFEngine, "apply affinity-based error estimator") self.HFEngine.buildA() self.HFEngine.buildb() mus = self.checkParameterList(mus) tMverb, self.trainedModel.verbosity = self.trainedModel.verbosity, 0 uApproxRs = self.getApproxReduced(mus).data self.trainedModel.verbosity = tMverb muTestEff = self.HFEngine.mapParameterList(mus) radiusA = np.empty((len(self.HFEngine.thAs), len(mus)), dtype = np.complex) radiusb = np.empty((len(self.HFEngine.thbs), len(mus)), dtype = np.complex) for j, thA in enumerate(self.HFEngine.thAs): radiusA[j] = expressionEvaluator(thA[0], muTestEff) for j, thb in enumerate(self.HFEngine.thbs): radiusb[j] = expressionEvaluator(thb[0], muTestEff) radiusA = np.expand_dims(uApproxRs, 1) * radiusA ff, Lf, LL = self._affineResidualMatricesContraction(radiusb, radiusA) err = np.abs((LL - 2. * np.real(Lf) + ff) / ff) ** .5 return err def errorEstimator(self, mus:Np1D, return_max : bool = False) -> Np1D: setupOK = self.setupApproxLocal() if setupOK > 0: err = np.empty(len(mus)) err[:] = np.nan if not return_max: return err return err, [- setupOK], np.nan mus = self.checkParameterList(mus) vbMng(self.trainedModel, "INIT", "Evaluating error estimator at mu = {}.".format(mus), 10) err = self.getErrorEstimatorAffine(mus) vbMng(self.trainedModel, "DEL", "Done evaluating error estimator", 10) if not return_max: return err idxMaxEst = [np.argmax(err)] return err, idxMaxEst, err[idxMaxEst] def _isLastSampleCollinear(self) -> bool: """Check collinearity of last sample.""" if self.collinearityTol <= 0.: return False if self.POD: reff = self.samplingEngine.RPOD[:, -1] else: RROMPyWarning(("Repeated orthogonalization of the samples for " "collinearity check. Consider setting POD to " "True.")) if not hasattr(self, "_PODEngine"): from rrompy.sampling import PODEngine self._PODEngine = PODEngine(self.HFEngine) reff = self._PODEngine.generalizedQR(self.samplingEngine.samples, only_R = True, is_state = True)[:, -1] cLevel = np.abs(reff[-1]) / np.linalg.norm(reff) cLevel = np.inf if np.isclose(cLevel, 0.) else cLevel ** -1. vbMng(self, "MAIN", "Collinearity indicator {:.4e}.".format(cLevel), 3) return cLevel > self.collinearityTol def plotEstimator(self, est:Np1D, idxMax:List[int], estMax:List[float]): if (not (np.any(np.isnan(est)) or np.any(np.isinf(est))) and masterCore()): fig = plt.figure(figsize = plt.figaspect(1. / self.npar)) for jpar in range(self.npar): ax = fig.add_subplot(1, self.npar, 1 + jpar) musre = np.array(self.muTest.re.data) errCP = copy(est) idx = np.delete(np.arange(self.npar), jpar) while len(musre) > 0: if self.npar == 1: currIdx = np.arange(len(musre)) else: currIdx = np.where(np.isclose(np.sum( np.abs(musre[:, idx] - musre[0, idx]), 1), 0.))[0] ax.semilogy(musre[currIdx, jpar], errCP[currIdx], 'k', linewidth = 1) musre = np.delete(musre, currIdx, 0) errCP = np.delete(errCP, currIdx) ax.semilogy([self.muBounds.re(0, jpar), self.muBounds.re(-1, jpar)], [self.greedyTol] * 2, 'r--') ax.semilogy(self.mus.re(jpar), 2. * self.greedyTol * np.ones(len(self.mus)), '*m') if len(idxMax) > 0 and estMax is not None: ax.semilogy(self.muTest.re(idxMax, jpar), estMax, 'xr') ax.set_xlim(*list(self.sampler.lims.re(jpar))) ax.grid() plt.tight_layout() plt.show() def greedyNextSample(self, muidx:int, plotEst : str = "NONE")\ -> Tuple[Np1D, int, float, paramVal]: """Compute next greedy snapshot of solution map.""" RROMPyAssert(self._mode, message = "Cannot add greedy sample.") mus = copy(self.muTest[muidx]) self.muTest.pop(muidx) for j, mu in enumerate(mus): vbMng(self, "MAIN", ("Adding sample point no. {} at {} to training " "set.").format(len(self.mus) + 1, mu), 3) self.mus.append(mu) self._S = len(self.mus) self._approxParameters["S"] = self.S if (self.samplingEngine.nsamples <= len(mus) - j - 1 or not np.allclose(mu, self.samplingEngine.mus[j - len(mus)])): self.samplingEngine.nextSample(mu) if self._isLastSampleCollinear(): vbMng(self, "MAIN", ("Collinearity above tolerance detected. Starting " "preemptive greedy loop termination."), 3) self._collinearityFlag = 1 errorEstTest = np.empty(len(self.muTest)) errorEstTest[:] = np.nan return errorEstTest, [-1], np.nan, np.nan errorEstTest, muidx, maxErrorEst = self.errorEstimator(self.muTest, True) if plotEst == "ALL": self.plotEstimator(errorEstTest, muidx, maxErrorEst) return errorEstTest, muidx, maxErrorEst, self.muTest[muidx] def _preliminaryTraining(self): """Initialize starting snapshots of solution map.""" RROMPyAssert(self._mode, message = "Cannot start greedy algorithm.") if self.samplingEngine.nsamples > 0: return self.resetSamples() self.computeScaleFactor() self.samplingEngine.scaleFactor = self.scaleFactorDer self.mus = self.trainSetGenerator.generatePoints(self.S) while len(self.mus) > self.S: self.mus.pop() muTestBase = self.sampler.generatePoints(self.nTestPoints, False) idxPop = pruneSamples(self.HFEngine.mapParameterList(muTestBase), self.HFEngine.mapParameterList(self.mus), 1e-10 * self.scaleFactor[0]) muTestBase.pop(idxPop) muLast = copy(self.mus[-1]) self.mus.pop() if len(self.mus) > 0: vbMng(self, "MAIN", ("Adding first {} sample point{} at {} to training " "set.").format(self.S - 1, "" + "s" * (self.S > 2), self.mus), 3) self.samplingEngine.iterSample(self.mus) self._S = len(self.mus) self._approxParameters["S"] = self.S self.muTest = emptyParameterList() self.muTest.reset((len(muTestBase) + 1, self.mus.shape[1])) self.muTest[: -1] = muTestBase.data self.muTest[-1] = muLast.data @abstractmethod def setupApproxLocal(self) -> int: if self.checkComputedApprox(): return -1 RROMPyAssert(self._mode, message = "Cannot setup approximant.") vbMng(self, "INIT", "Setting up local approximant.", 5) pass vbMng(self, "DEL", "Done setting up local approximant.", 5) return 0 def setupApprox(self, plotEst : str = "NONE") -> int: """Compute greedy snapshots of solution map.""" if self.checkComputedApprox(): return -1 RROMPyAssert(self._mode, message = "Cannot start greedy algorithm.") vbMng(self, "INIT", "Starting computation of snapshots.", 3) self._collinearityFlag = 0 self._preliminaryTraining() muidx, self.firstGreedyIter = [len(self.muTest) - 1], True errorEstTest, maxErrorEst = [np.inf], np.inf max2ErrorEst, trainedModelOld = np.inf, None while self.firstGreedyIter or (len(self.muTest) > 0 and (maxErrorEst is None or max2ErrorEst > self.greedyTol) and self.samplingEngine.nsamples < self.maxIter): muTestOld, errorEstTestOld = self.muTest, errorEstTest muidxOld, maxErrorEstOld = muidx, maxErrorEst errorEstTest, muidx, maxErrorEst, mu = self.greedyNextSample( muidx, plotEst) if maxErrorEst is not None and (np.any(np.isnan(maxErrorEst)) or np.any(np.isinf(maxErrorEst))): if self._collinearityFlag == 0 and not self.firstGreedyIter: RROMPyWarning(("Instability in a posteriori " "estimator. Starting preemptive greedy " "loop termination.")) self.muTest, errorEstTest = muTestOld, errorEstTestOld + self.mus.pop(-1) + self.samplingEngine.popSample() if self.firstGreedyIter: - self.mus.pop(-1) - self.samplingEngine.popSample() if muidx[0] < 0: self.trainedModel = None raise RROMPyException(("Instability in approximant " "computation. Aborting greedy " "iterations.")) else: self._approxParameters = ( trainedModelOld.data.approxParameters) self._S = trainedModelOld.data.approxParameters["S"] self._approxParameters["S"] = self.S self.trainedModel.data = copy(trainedModelOld.data) muidx, maxErrorEst = muidxOld, maxErrorEstOld break if maxErrorEst is not None: max2ErrorEst = np.max(maxErrorEst) vbMng(self, "MAIN", ("Uniform testing error estimate " "{:.4e}.").format(max2ErrorEst), 3) if self.firstGreedyIter: trainedModelOld = copy(self.trainedModel) else: trainedModelOld.data = copy(self.trainedModel.data) self.firstGreedyIter = False if (maxErrorEst is None or max2ErrorEst <= self.greedyTol or np.any(np.isnan(maxErrorEst)) or np.any(np.isinf(maxErrorEst))): while self.samplingEngine.nsamples > self.S: self.samplingEngine.popSample() while len(self.mus) > self.S: self.mus.pop(-1) else: self._S = self.samplingEngine.nsamples self._approxParameters["S"] = self.S while len(self.mus) < self.S: self.mus.append(self.samplingEngine.mus[len(self.mus)]) self.setupApproxLocal() if plotEst == "LAST": self.plotEstimator(errorEstTest, muidx, maxErrorEst) vbMng(self, "DEL", ("Done computing snapshots (final snapshot count: " "{}).").format(self.samplingEngine.nsamples), 3) return 0 def assembleReducedResidualGramian(self, pMat:sampList): """ Build residual gramian of reduced linear system through projections. """ self.initEstimatorNormEngine() if (not hasattr(self.trainedModel.data, "gramian") or self.trainedModel.data.gramian is None): gramian = self.estimatorNormEngine.innerProduct(pMat, pMat) else: Sold = self.trainedModel.data.gramian.shape[0] S = len(self.mus) if Sold > S: gramian = self.trainedModel.data.gramian[: S, : S] else: idxOld = list(range(Sold)) idxNew = list(range(Sold, S)) gramian = np.empty((S, S), dtype = np.complex) gramian[: Sold, : Sold] = self.trainedModel.data.gramian gramian[: Sold, Sold :] = ( self.estimatorNormEngine.innerProduct(pMat(idxNew), pMat(idxOld))) gramian[Sold :, : Sold] = gramian[: Sold, Sold :].T.conj() gramian[Sold :, Sold :] = ( self.estimatorNormEngine.innerProduct(pMat(idxNew), pMat(idxNew))) self.trainedModel.data.gramian = gramian def assembleReducedResidualBlocksbb(self, bs:List[Np1D]): """ Build blocks (of type bb) of reduced linear system through projections. """ self.initEstimatorNormEngine() nbs = len(bs) if (not hasattr(self.trainedModel.data, "resbb") or self.trainedModel.data.resbb is None): resbb = np.empty((nbs, nbs), dtype = np.complex) for i in range(nbs): Mbi = bs[i] resbb[i, i] = self.estimatorNormEngine.innerProduct(Mbi, Mbi) for j in range(i): Mbj = bs[j] resbb[i, j] = self.estimatorNormEngine.innerProduct(Mbj, Mbi) for i in range(nbs): for j in range(i + 1, nbs): resbb[i, j] = resbb[j, i].conj() self.trainedModel.data.resbb = resbb def assembleReducedResidualBlocksAb(self, As:List[Np2D], bs:List[Np1D], pMat:sampList): """ Build blocks (of type Ab) of reduced linear system through projections. """ self.initEstimatorNormEngine() nAs = len(As) nbs = len(bs) S = len(self.mus) if (not hasattr(self.trainedModel.data, "resAb") or self.trainedModel.data.resAb is None): if isinstance(pMat, (parameterList, sampleList)): pMat = pMat.data resAb = np.empty((nbs, S, nAs), dtype = np.complex) for j in range(nAs): MAj = dot(As[j], pMat) for i in range(nbs): Mbi = bs[i] resAb[i, :, j] = self.estimatorNormEngine.innerProduct(MAj, Mbi) else: Sold = self.trainedModel.data.resAb.shape[1] if Sold == S: return if Sold > S: resAb = self.trainedModel.data.resAb[:, : S, :] else: if isinstance(pMat, (parameterList, sampleList)): pMat = pMat.data resAb = np.empty((nbs, S, nAs), dtype = np.complex) resAb[:, : Sold, :] = self.trainedModel.data.resAb for j in range(nAs): MAj = dot(As[j], pMat[:, Sold :]) for i in range(nbs): Mbi = bs[i] resAb[i, Sold :, j] = ( self.estimatorNormEngine.innerProduct(MAj, Mbi)) self.trainedModel.data.resAb = resAb def assembleReducedResidualBlocksAA(self, As:List[Np2D], pMat:sampList): """ Build blocks (of type AA) of reduced linear system through projections. """ self.initEstimatorNormEngine() nAs = len(As) S = len(self.mus) if (not hasattr(self.trainedModel.data, "resAA") or self.trainedModel.data.resAA is None): if isinstance(pMat, (parameterList, sampleList)): pMat = pMat.data resAA = np.empty((S, nAs, S, nAs), dtype = np.complex) for i in range(nAs): MAi = dot(As[i], pMat) resAA[:, i, :, i] = ( self.estimatorNormEngine.innerProduct(MAi, MAi)) for j in range(i): MAj = dot(As[j], pMat) resAA[:, i, :, j] = ( self.estimatorNormEngine.innerProduct(MAj, MAi)) for i in range(nAs): for j in range(i + 1, nAs): resAA[:, i, :, j] = resAA[:, j, :, i].T.conj() else: Sold = self.trainedModel.data.resAA.shape[0] if Sold == S: return if Sold > S: resAA = self.trainedModel.data.resAA[: S, :, : S, :] else: if isinstance(pMat, (parameterList, sampleList)): pMat = pMat.data resAA = np.empty((S, nAs, S, nAs), dtype = np.complex) resAA[: Sold, :, : Sold, :] = self.trainedModel.data.resAA for i in range(nAs): MAi = dot(As[i], pMat) resAA[: Sold, i, Sold :, i] = ( self.estimatorNormEngine.innerProduct(MAi[:, Sold :], MAi[:, : Sold])) resAA[Sold :, i, : Sold, i] = resAA[: Sold, i, Sold :, i].T.conj() resAA[Sold :, i, Sold :, i] = ( self.estimatorNormEngine.innerProduct(MAi[:, Sold :], MAi[:, Sold :])) for j in range(i): MAj = dot(As[j], pMat) resAA[: Sold, i, Sold :, j] = ( self.estimatorNormEngine.innerProduct(MAj[:, Sold :], MAi[:, : Sold])) resAA[Sold :, i, : Sold, j] = ( self.estimatorNormEngine.innerProduct(MAj[:, : Sold], MAi[:, Sold :])) resAA[Sold :, i, Sold :, j] = ( self.estimatorNormEngine.innerProduct(MAj[:, Sold :], MAi[:, Sold :])) for i in range(nAs): for j in range(i + 1, nAs): resAA[: Sold, i, Sold :, j] = ( resAA[Sold :, j, : Sold, i].T.conj()) resAA[Sold :, i, : Sold, j] = ( resAA[: Sold, j, Sold :, i].T.conj()) resAA[Sold :, i, Sold :, j] = ( resAA[Sold :, j, Sold :, i].T.conj()) self.trainedModel.data.resAA = resAA def assembleReducedResidualBlocks(self, full : bool = False): """Build affine blocks of affine decomposition of residual.""" if full: checkIfAffine(self.HFEngine, "assemble reduced residual blocks") else: checkIfAffine(self.HFEngine, "assemble reduced RHS blocks", True) self.HFEngine.buildb() self.assembleReducedResidualBlocksbb(self.HFEngine.bs) if full: pMat = self.samplingEngine.projectionMatrix self.HFEngine.buildA() self.assembleReducedResidualBlocksAb(self.HFEngine.As, self.HFEngine.bs, pMat) self.assembleReducedResidualBlocksAA(self.HFEngine.As, pMat) diff --git a/rrompy/reduction_methods/standard/greedy/rational_interpolant_greedy.py b/rrompy/reduction_methods/standard/greedy/rational_interpolant_greedy.py index 8849e33..01bb945 100644 --- a/rrompy/reduction_methods/standard/greedy/rational_interpolant_greedy.py +++ b/rrompy/reduction_methods/standard/greedy/rational_interpolant_greedy.py @@ -1,541 +1,543 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from copy import deepcopy as copy import numpy as np from rrompy.hfengines.base.linear_affine_engine import checkIfAffine from .generic_greedy_approximant import GenericGreedyApproximant from rrompy.utilities.poly_fitting.polynomial import (polybases, polyfitname, PolynomialInterpolator as PI, polyvander) from rrompy.utilities.numerical import dot from rrompy.utilities.numerical.degree import totalDegreeN from rrompy.utilities.expression import expressionEvaluator from rrompy.reduction_methods.standard import RationalInterpolant from rrompy.utilities.base.types import Np1D, Tuple, paramVal, List -from rrompy.utilities.base import verbosityManager as vbMng +from rrompy.utilities.base.verbosity_depth import (verbosityManager as vbMng, + getVerbosityDepth, setVerbosityDepth) from rrompy.utilities.poly_fitting import customFit from rrompy.utilities.exception_manager import (RROMPyWarning, RROMPyException, RROMPyAssert, RROMPy_FRAGILE) from rrompy.sampling import sampleList, emptySampleList __all__ = ['RationalInterpolantGreedy'] class RationalInterpolantGreedy(GenericGreedyApproximant, RationalInterpolant): """ ROM greedy rational interpolant computation for parametric problems. Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': whether to compute POD of snapshots; defaults to True; - 'scaleFactorDer': scaling factors for derivative computation; defaults to 'AUTO'; - 'S': number of starting training points; - 'sampler': sample point generator; - 'greedyTol': uniform error tolerance for greedy algorithm; defaults to 1e-2; - 'collinearityTol': collinearity tolerance for greedy algorithm; defaults to 0.; - 'maxIter': maximum number of greedy steps; defaults to 1e2; - 'nTestPoints': number of test points; defaults to 5e2; - 'trainSetGenerator': training sample points generator; defaults to sampler; - 'polybasis': type of basis for interpolation; defaults to 'MONOMIAL'; - 'errorEstimatorKind': kind of error estimator; available values include 'AFFINE', 'DISCREPANCY', 'LOOK_AHEAD', 'LOOK_AHEAD_RES', 'LOOK_AHEAD_OUTPUT', and 'NONE'; defaults to 'NONE'; - 'interpRcond': tolerance for interpolation; defaults to None; - 'robustTol': tolerance for robust rational denominator management; defaults to 0; - 'correctorForce': whether corrector should forcefully delete bad poles; defaults to False; - 'correctorTol': tolerance for corrector step; defaults to 0., i.e. no bad poles; - 'correctorMaxIter': maximum number of corrector iterations; defaults to 1. Defaults to empty dict. approx_state(optional): Whether to approximate state. Defaults and must be True. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. mu0: Default parameter. mus: Array of snapshot parameters. approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList. parameterListSoft: Recognized keys of soft approximant parameters: - 'POD': whether to compute POD of snapshots; - 'scaleFactorDer': scaling factors for derivative computation; - 'greedyTol': uniform error tolerance for greedy algorithm; - 'collinearityTol': collinearity tolerance for greedy algorithm; - 'maxIter': maximum number of greedy steps; - 'nTestPoints': number of test points; - 'trainSetGenerator': training sample points generator; - 'errorEstimatorKind': kind of error estimator; - 'interpRcond': tolerance for interpolation; - 'robustTol': tolerance for robust rational denominator management; - 'correctorForce': whether corrector should forcefully delete bad poles; - 'correctorTol': tolerance for corrector step; - 'correctorMaxIter': maximum number of corrector iterations. parameterListCritical: Recognized keys of critical approximant parameters: - 'S': total number of samples current approximant relies upon; - 'sampler': sample point generator. approx_state: Whether to approximate state. verbosity: Verbosity level. POD: whether to compute POD of snapshots. scaleFactorDer: Scaling factors for derivative computation. S: number of test points. sampler: Sample point generator. greedyTol: uniform error tolerance for greedy algorithm. collinearityTol: Collinearity tolerance for greedy algorithm. maxIter: maximum number of greedy steps. nTestPoints: number of starting training points. trainSetGenerator: training sample points generator. robustTol: tolerance for robust rational denominator management. correctorForce: Whether corrector should forcefully delete bad poles. correctorTol: Tolerance for corrector step. correctorMaxIter: Maximum number of corrector iterations. errorEstimatorKind: kind of error estimator. interpRcond: tolerance for interpolation. robustTol: tolerance for robust rational denominator management. muBounds: list of bounds for parameter values. samplingEngine: Sampling engine. estimatorNormEngine: Engine for estimator norm computation. uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as sampleList. lastSolvedHF: Parameter(s) corresponding to last computed high fidelity solution(s) as parameterList. uApproxReduced: Reduced approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApproxReduced: Parameter(s) corresponding to last computed reduced approximate solution(s) as parameterList. uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApprox: Parameter(s) corresponding to last computed approximate solution(s) as parameterList. """ _allowedEstimatorKinds = ["AFFINE", "DISCREPANCY", "LOOK_AHEAD", "LOOK_AHEAD_RES", "LOOK_AHEAD_OUTPUT", "NONE"] def __init__(self, *args, **kwargs): self._preInit() self._addParametersToList(["errorEstimatorKind"], ["DISCREPANCY"], toBeExcluded = ["M", "N", "polydegreetype", "radialDirectionalWeights"]) super().__init__(*args, **kwargs) if not self.approx_state and self.errorEstimatorKind not in [ "LOOK_AHEAD", "LOOK_AHEAD_OUTPUT", "NONE"]: raise RROMPyException(("Must compute greedy approximation of " "state, unless error estimator allows " "otherwise.")) self._postInit() @property def approx_state(self): """Value of approx_state.""" return self._approx_state @approx_state.setter def approx_state(self, approx_state): RationalInterpolant.approx_state.fset(self, approx_state) if (not self.approx_state and hasattr(self, "_errorEstimatorKind") and self.errorEstimatorKind not in [ "LOOK_AHEAD", "LOOK_AHEAD_OUTPUT", "NONE"]): raise RROMPyException(("Must compute greedy approximation of " "state, unless error estimator allows " "otherwise.")) @property def E(self): """Value of E.""" self._E = self.sampleBatchIdx - 1 return self._E @E.setter def E(self, E): RROMPyWarning(("E is used just to simplify inheritance, and its value " "cannot be changed from that of sampleBatchIdx - 1.")) def _setMAuto(self): self.M = self.E def _setNAuto(self): self.N = self.E @property def polydegreetype(self): """Value of polydegreetype.""" return "TOTAL" @polydegreetype.setter def polydegreetype(self, polydegreetype): RROMPyWarning(("polydegreetype is used just to simplify inheritance, " "and its value cannot be changed from 'TOTAL'.")) @property def polybasis(self): """Value of polybasis.""" return self._polybasis @polybasis.setter def polybasis(self, polybasis): try: polybasis = polybasis.upper().strip().replace(" ","") if polybasis not in polybases: raise RROMPyException("Sample type not recognized.") self._polybasis = polybasis except: RROMPyWarning(("Prescribed polybasis not recognized. Overriding " "to 'MONOMIAL'.")) self._polybasis = "MONOMIAL" self._approxParameters["polybasis"] = self.polybasis @property def errorEstimatorKind(self): """Value of errorEstimatorKind.""" return self._errorEstimatorKind @errorEstimatorKind.setter def errorEstimatorKind(self, errorEstimatorKind): errorEstimatorKind = errorEstimatorKind.upper() if errorEstimatorKind not in self._allowedEstimatorKinds: RROMPyWarning(("Error estimator kind not recognized. Overriding " "to 'NONE'.")) errorEstimatorKind = "NONE" self._errorEstimatorKind = errorEstimatorKind self._approxParameters["errorEstimatorKind"] = self.errorEstimatorKind if (self.errorEstimatorKind not in [ "LOOK_AHEAD", "LOOK_AHEAD_OUTPUT", "NONE"] and hasattr(self, "_approx_state") and not self.approx_state): raise RROMPyException(("Must compute greedy approximation of " "state, unless error estimator allows " "otherwise.")) def _polyvanderAuxiliary(self, mus, deg, *args): return polyvander(mus, deg, *args) def getErrorEstimatorDiscrepancy(self, mus:Np1D) -> Np1D: """Discrepancy-based residual estimator.""" checkIfAffine(self.HFEngine, "apply discrepancy-based error estimator") mus = self.checkParameterList(mus) muCTest = self.trainedModel.centerNormalize(mus) tMverb, self.trainedModel.verbosity = self.trainedModel.verbosity, 0 QTest = self.trainedModel.getQVal(mus) QTzero = np.where(QTest == 0.)[0] if len(QTzero) > 0: RROMPyWarning(("Adjusting estimator to avoid division by " "numerically zero denominator.")) QTest[QTzero] = np.finfo(np.complex).eps / (1. + self.N) self.HFEngine.buildA() self.HFEngine.buildb() nAs, nbs = self.HFEngine.nAs, self.HFEngine.nbs muTrainEff = self.HFEngine.mapParameterList(self.mus) muTestEff = self.HFEngine.mapParameterList(mus) PTrain = self.trainedModel.getPVal(self.mus).data.T QTrain = self.trainedModel.getQVal(self.mus) QTzero = np.where(QTrain == 0.)[0] if len(QTzero) > 0: RROMPyWarning(("Adjusting estimator to avoid division by " "numerically zero denominator.")) QTrain[QTzero] = np.finfo(np.complex).eps / (1. + self.N) PTest = self.trainedModel.getPVal(mus).data self.trainedModel.verbosity = tMverb radiusAbTrain = np.empty((self.S, nAs * self.S + nbs), dtype = np.complex) radiusA = np.empty((self.S, nAs, len(mus)), dtype = np.complex) radiusb = np.empty((nbs, len(mus)), dtype = np.complex) for j, thA in enumerate(self.HFEngine.thAs): idxs = j * self.S + np.arange(self.S) radiusAbTrain[:, idxs] = expressionEvaluator(thA[0], muTrainEff, (self.S, 1)) * PTrain radiusA[:, j] = PTest * expressionEvaluator(thA[0], muTestEff, (len(mus),)) for j, thb in enumerate(self.HFEngine.thbs): idx = nAs * self.S + j radiusAbTrain[:, idx] = QTrain * expressionEvaluator(thb[0], muTrainEff, (self.S,)) radiusb[j] = QTest * expressionEvaluator(thb[0], muTestEff, (len(mus),)) QRHSNorm2 = self._affineResidualMatricesContraction(radiusb) vanTrain = self._polyvanderAuxiliary(self._musUniqueCN, self.E, self.polybasis0, self._derIdxs, self._reorder) interpPQ = customFit(vanTrain, radiusAbTrain, rcond = self.interpRcond) vanTest = self._polyvanderAuxiliary(muCTest, self.E, self.polybasis0) DradiusAb = vanTest.dot(interpPQ) radiusA = (radiusA - DradiusAb[:, : - nbs].reshape(len(mus), -1, self.S).T) radiusb = radiusb - DradiusAb[:, - nbs :].T ff, Lf, LL = self._affineResidualMatricesContraction(radiusb, radiusA) err = np.abs((LL - 2. * np.real(Lf) + ff) / QRHSNorm2) ** .5 return err def getErrorEstimatorLookAhead(self, mus:Np1D, what : str = "") -> Tuple[Np1D, List[int]]: """Residual estimator based on look-ahead idea.""" errTest, QTest, idxMaxEst = self._EIMStep(mus) _approx_state_old = self.approx_state if what == "OUTPUT" and _approx_state_old: self._approx_state = False self.initEstimatorNormEngine() self._approx_state = _approx_state_old mu_muTestSample = mus[idxMaxEst] app_muTestSample = self.getApproxReduced(mu_muTestSample) if self._mode == RROMPy_FRAGILE: if what == "RES" and not self.HFEngine.isCEye: raise RROMPyException(("Cannot compute LOOK_AHEAD_RES " "estimator in fragile mode for " "non-scalar C.")) app_muTestSample = dot(self.trainedModel.data.projMat[:, : app_muTestSample.shape[0]], app_muTestSample) else: app_muTestSample = dot(self.samplingEngine.projectionMatrix, app_muTestSample) if what == "RES": errmu = self.HFEngine.residual(mu_muTestSample, app_muTestSample, post_c = False) solmu = self.HFEngine.residual(mu_muTestSample, None, post_c = False) else: for j, mu in enumerate(mu_muTestSample): uEx = self.samplingEngine.nextSample(mu) if j == 0: solmu = emptySampleList() solmu.reset((len(uEx), len(mu_muTestSample)), dtype = uEx.dtype) solmu[j] = uEx if what == "OUTPUT" and self.approx_state: solmu = sampleList(self.HFEngine.applyC(solmu)) app_muTestSample = sampleList(self.HFEngine.applyC( app_muTestSample)) errmu = solmu - app_muTestSample errsamples = (self.estimatorNormEngine.norm(errmu) / self.estimatorNormEngine.norm(solmu)) musT = copy(self.mus) musT.append(mu_muTestSample) musT = self.trainedModel.centerNormalize(musT) musC = self.trainedModel.centerNormalize(mus) errT = np.zeros((len(musT), len(mu_muTestSample)), dtype = np.complex) errT[np.arange(len(self.mus), len(musT)), np.arange(len(mu_muTestSample))] = errsamples * QTest[idxMaxEst] vanT = self._polyvanderAuxiliary(musT, self.E + 1, self.polybasis) fitOut = customFit(vanT, errT, full = True, rcond = self.interpRcond) vbMng(self, "MAIN", ("Fitting {} samples with degree {} through {}... Conditioning " "of LS system: {:.4e}.").format(len(vanT), self.E + 1, polyfitname(self.polybasis), fitOut[1][2][0] / fitOut[1][2][-1]), 15) vanC = self._polyvanderAuxiliary(musC, self.E + 1, self.polybasis) err = np.sum(np.abs(vanC.dot(fitOut[0])), axis = -1) / QTest return err, idxMaxEst def getErrorEstimatorNone(self, mus:Np1D) -> Np1D: """EIM-based residual estimator.""" err = np.max(self._EIMStep(mus, True), axis = 1) err *= self.greedyTol / np.mean(err) return err def _EIMStep(self, mus:Np1D, only_one : bool = False) -> Tuple[Np1D, Np1D, List[int]]: """Residual estimator based on look-ahead idea.""" mus = self.checkParameterList(mus) tMverb, self.trainedModel.verbosity = self.trainedModel.verbosity, 0 QTest = self.trainedModel.getQVal(mus) QTzero = np.where(QTest == 0.)[0] if len(QTzero) > 0: RROMPyWarning(("Adjusting estimator to avoid division by " "numerically zero denominator.")) QTest[QTzero] = np.finfo(np.complex).eps / (1. + self.N) QTest = np.abs(QTest) muCTest = self.trainedModel.centerNormalize(mus) muCTrain = self.trainedModel.centerNormalize(self.mus) self.trainedModel.verbosity = tMverb vanTest = self._polyvanderAuxiliary(muCTest, self.E, self.polybasis) vanTestNext = self._polyvanderAuxiliary(muCTest, self.E + 1, self.polybasis)[:, vanTest.shape[1] :] idxsTest = np.arange(vanTestNext.shape[1]) basis = np.zeros((len(idxsTest), 0), dtype = float) idxMaxEst = [] while len(idxsTest) > 0: vanTrial = self._polyvanderAuxiliary(muCTrain, self.E, self.polybasis) vanTrialNext = self._polyvanderAuxiliary(muCTrain, self.E + 1, self.polybasis)[:, vanTrial.shape[1] :] vanTrial = np.hstack((vanTrial, vanTrialNext.dot(basis).reshape( len(vanTrialNext), basis.shape[1]))) valuesTrial = vanTrialNext[:, idxsTest] vanTestEff = np.hstack((vanTest, vanTestNext.dot(basis).reshape( len(vanTestNext), basis.shape[1]))) vanTestNextEff = vanTestNext[:, idxsTest] try: coeffTest = np.linalg.solve(vanTrial, valuesTrial) except np.linalg.LinAlgError as e: raise RROMPyException(e) errTest = (np.abs(vanTestNextEff - vanTestEff.dot(coeffTest)) / np.expand_dims(QTest, 1)) if only_one: return errTest idxMaxErr = np.unravel_index(np.argmax(errTest), errTest.shape) idxMaxEst += [idxMaxErr[0]] muCTrain.append(muCTest[idxMaxErr[0]]) basis = np.pad(basis, [(0, 0), (0, 1)], "constant") basis[idxsTest[idxMaxErr[1]], -1] = 1. idxsTest = np.delete(idxsTest, idxMaxErr[1]) return errTest, QTest, idxMaxEst def errorEstimator(self, mus:Np1D, return_max : bool = False) -> Np1D: """Standard residual-based error estimator.""" setupOK = self.setupApproxLocal() if setupOK > 0: err = np.empty(len(mus)) err[:] = np.nan if not return_max: return err return err, [- setupOK], np.nan mus = self.checkParameterList(mus) vbMng(self.trainedModel, "INIT", "Evaluating error estimator at mu = {}.".format(mus), 10) if self.errorEstimatorKind == "AFFINE": err = self.getErrorEstimatorAffine(mus) else: self._setupInterpolationIndices() if self.errorEstimatorKind == "DISCREPANCY": err = self.getErrorEstimatorDiscrepancy(mus) elif self.errorEstimatorKind[: 10] == "LOOK_AHEAD": err, idxMaxEst = self.getErrorEstimatorLookAhead(mus, self.errorEstimatorKind[11 :]) else: #if self.errorEstimatorKind == "NONE": err = self.getErrorEstimatorNone(mus) vbMng(self.trainedModel, "DEL", "Done evaluating error estimator", 10) if not return_max: return err if self.errorEstimatorKind[: 10] != "LOOK_AHEAD": idxMaxEst = np.empty(self.sampleBatchSize, dtype = int) errCP = copy(err) for j in range(self.sampleBatchSize): k = np.argmax(errCP) idxMaxEst[j] = k if j + 1 < self.sampleBatchSize: musZero = self.trainedModel.centerNormalize(mus, mus[k]) errCP *= np.linalg.norm(musZero.data, axis = 1) return err, idxMaxEst, err[idxMaxEst] def plotEstimator(self, *args, **kwargs): super().plotEstimator(*args, **kwargs) if self.errorEstimatorKind == "NONE": vbMng(self, "MAIN", ("Warning! Error estimator has been arbitrarily normalized " "before plotting."), 15) def greedyNextSample(self, *args, **kwargs) -> Tuple[Np1D, int, float, paramVal]: """Compute next greedy snapshot of solution map.""" RROMPyAssert(self._mode, message = "Cannot add greedy sample.") self.sampleBatchIdx += 1 self.sampleBatchSize = totalDegreeN(self.npar - 1, self.sampleBatchIdx) err, muidx, maxErr, muNext = super().greedyNextSample(*args, **kwargs) if maxErr is not None and (np.any(np.isnan(maxErr)) or np.any(np.isinf(maxErr))): self.sampleBatchIdx -= 1 self.sampleBatchSize = totalDegreeN(self.npar - 1, self.sampleBatchIdx) if (self.errorEstimatorKind == "NONE" and not np.isnan(maxErr) and not np.isinf(maxErr)): maxErr = None return err, muidx, maxErr, muNext def _setSampleBatch(self, maxS:int): self.sampleBatchIdx, self.sampleBatchSize, S = -1, 0, 0 nextBatchSize = 1 while S + nextBatchSize <= maxS: self.sampleBatchIdx += 1 self.sampleBatchSize = nextBatchSize S += self.sampleBatchSize nextBatchSize = totalDegreeN(self.npar - 1, self.sampleBatchIdx + 1) return S def _preliminaryTraining(self): """Initialize starting snapshots of solution map.""" RROMPyAssert(self._mode, message = "Cannot start greedy algorithm.") if self.samplingEngine.nsamples > 0: return self._S = self._setSampleBatch(self.S) super()._preliminaryTraining() self.M, self.N = ("AUTO",) * 2 def setupApproxLocal(self) -> int: """Compute rational interpolant.""" if self.checkComputedApprox(): return -1 RROMPyAssert(self._mode, message = "Cannot setup approximant.") self.verbosity -= 10 vbMng(self, "INIT", "Setting up local approximant.", 5) pMat = self.samplingEngine.projectionMatrix if self.trainedModel is not None: pMat = pMat[:, len(self.trainedModel.data.mus) :] self._setupTrainedModel(pMat, self.trainedModel is not None) self.catchInstability = 2 + vbDepth = getVerbosityDepth() unstable = False if self.E > 0: try: Q = self._setupDenominator()[0] except RROMPyException as RE: + setVerbosityDepth(vbDepth) RROMPyWarning("Downgraded {}: {}".format(RE.__class__.__name__, RE)) - vbMng(self, "DEL", "", 7) unstable = True else: Q = PI() Q.coeffs = np.ones((1,) * self.npar, dtype = np.complex) Q.npar = self.npar Q.polybasis = self.polybasis if not unstable: self.trainedModel.data.Q = copy(Q) try: P = copy(self._setupNumerator()) except RROMPyException as RE: + setVerbosityDepth(vbDepth) RROMPyWarning("Downgraded {}: {}".format(RE.__class__.__name__, RE)) - vbMng(self, "DEL", "", 7) unstable = True if not unstable: self.trainedModel.data.P = copy(P) self.trainedModel.data.approxParameters = copy( self.approxParameters) vbMng(self, "DEL", "Done setting up local approximant.", 5) self.catchInstability = 0 self.verbosity += 10 return 1 * unstable def setupApprox(self, plotEst : str = "NONE") -> int: val = super().setupApprox(plotEst) if val == 0: self._iterCorrector() self.trainedModel.data.approxParameters = copy( self.approxParameters) return val def loadTrainedModel(self, filename:str): """Load trained reduced model from file.""" super().loadTrainedModel(filename) self._setSampleBatch(self.S + 1) diff --git a/rrompy/utilities/base/verbosity_depth.py b/rrompy/utilities/base/verbosity_depth.py index 5672e6c..a9e000a 100644 --- a/rrompy/utilities/base/verbosity_depth.py +++ b/rrompy/utilities/base/verbosity_depth.py @@ -1,86 +1,96 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from copy import deepcopy as copy from datetime import datetime from rrompy.utilities.exception_manager import RROMPyException __all__ = ["verbosityDepth", "verbosityManager"] def getTimestamp() -> str: time = datetime.now().strftime("%H:%M:%S.%f") return "\x1b[42m{}\x1b[0m".format(time) def updateVerbosityCheckpoint(vctype:int) -> str: global RROMPy_verbosity_checkpoint, RROMPy_verbosity_buffer if "RROMPy_verbosity_checkpoint" not in globals(): RROMPy_verbosity_checkpoint = 0 RROMPy_verbosity_checkpoint += vctype if "RROMPy_verbosity_buffer" not in globals(): RROMPy_verbosity_buffer = "" if RROMPy_verbosity_checkpoint <= 0: buffer = copy(RROMPy_verbosity_buffer) del RROMPy_verbosity_buffer return buffer return None +def getVerbosityDepth() -> int: + global RROMPy_verbosity_depth + if "RROMPy_verbosity_depth" not in globals(): return 0 + return RROMPy_verbosity_depth + +def setVerbosityDepth(depth): + global RROMPy_verbosity_depth + if depth <= 0: + del RROMPy_verbosity_depth + else: + RROMPy_verbosity_depth = depth + def verbosityDepth(vdtype:str, message:str, end : str = "\n", timestamp : bool = True): global RROMPy_verbosity_depth, RROMPy_verbosity_checkpoint, \ RROMPy_verbosity_buffer assert isinstance(vdtype, str) vdtype = vdtype.upper() if vdtype not in ["INIT", "MAIN", "DEL"]: raise RROMPyException("Verbosity depth type not recognized.") if "RROMPy_verbosity_checkpoint" not in globals(): RROMPy_verbosity_checkpoint = 0 - out = "{} ".format(getTimestamp()) if timestamp else "" if vdtype == "INIT": if "RROMPy_verbosity_depth" not in globals(): - RROMPy_verbosity_depth = 0 - RROMPy_verbosity_depth += 1 - out += "│" * (RROMPy_verbosity_depth - 1) + setVerbosityDepth(1) + else: + setVerbosityDepth(RROMPy_verbosity_depth + 1) + assert "RROMPy_verbosity_depth" in globals() + out = "{} ".format(getTimestamp()) if timestamp else "" + out += "│" * (RROMPy_verbosity_depth - 1) + if vdtype == "INIT": out += "┌" - else: - assert "RROMPy_verbosity_depth" in globals() - if vdtype == "MAIN": - out += "│" * (RROMPy_verbosity_depth - 1) - out += "├" - elif vdtype == "DEL": - RROMPy_verbosity_depth -= 1 - out += "│" * RROMPy_verbosity_depth - out += "└" - if RROMPy_verbosity_depth <= 0: del RROMPy_verbosity_depth + elif vdtype == "MAIN": + out += "├" + else: #if vdtype == "DEL": + setVerbosityDepth(RROMPy_verbosity_depth - 1) + out += "└" from rrompy.utilities.parallel import poolRank, poolSize, masterCore if message != "" and masterCore(): if RROMPy_verbosity_checkpoint and poolSize() > 1: poolrk = "{{\x1b[34m{}\x1b[0m}}".format(poolRank()) else: poolrk = "" msg = "{}{}{}{}".format(out, poolrk, message, end) if RROMPy_verbosity_checkpoint: assert "RROMPy_verbosity_buffer" in globals() RROMPy_verbosity_buffer += msg else: print(msg, end = "", flush = True) return def verbosityManager(object, vdtype:str, message:str, vlvl : int = 0, end : str = "\n"): if object.verbosity >= vlvl: return verbosityDepth(vdtype, message, end, object.timestamp) diff --git a/rrompy/utilities/exception_manager/exception_manager.py b/rrompy/utilities/exception_manager/exception_manager.py index 4fffb61..61a00c5 100644 --- a/rrompy/utilities/exception_manager/exception_manager.py +++ b/rrompy/utilities/exception_manager/exception_manager.py @@ -1,32 +1,26 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # __all__ = ["RROMPyException"] -def purgeVerbosityDepth(): - from rrompy.utilities.base.verbosity_depth import verbosityDepth - while True: - try: - verbosityDepth("DEL", "", "", False) - except: - break - class RROMPyException(Exception): def __init__(self, message, purge : bool = True): - if purge: purgeVerbosityDepth() + if purge: + from rrompy.utilities.base.verbosity_depth import setVerbosityDepth + setVerbosityDepth(0) super().__init__(message)