diff --git a/rrompy/reduction_methods/greedy/generic_greedy_approximant.py b/rrompy/reduction_methods/greedy/generic_greedy_approximant.py index 9c8efa2..d75d326 100644 --- a/rrompy/reduction_methods/greedy/generic_greedy_approximant.py +++ b/rrompy/reduction_methods/greedy/generic_greedy_approximant.py @@ -1,668 +1,668 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from copy import deepcopy as copy import numpy as np from rrompy.hfengines.base.linear_affine_engine import checkIfAffine from rrompy.reduction_methods.standard.generic_standard_approximant \ import GenericStandardApproximant from rrompy.utilities.base.types import (Np1D, Np2D, DictAny, HFEng, Tuple, List, normEng, paramVal, paramList, sampList) from rrompy.utilities.base import verbosityManager as vbMng from rrompy.utilities.numerical import dot from rrompy.utilities.expression import expressionEvaluator from rrompy.solver import normEngine from rrompy.utilities.exception_manager import (RROMPyException, RROMPyAssert, RROMPyWarning) from rrompy.parameter import checkParameterList, emptyParameterList __all__ = ['GenericGreedyApproximant'] def localL2Distance(mus:Np2D, badmus:Np2D) -> Np2D: return np.linalg.norm(np.tile(mus[..., np.newaxis], [1, 1, len(badmus)]) - badmus[..., np.newaxis].T, axis = 1) def pruneSamples(mus:paramList, badmus:paramList, tol : float = 1e-8) -> Np1D: """Remove from mus all the elements which are too close to badmus.""" if len(badmus) == 0: return mus proximity = np.min(localL2Distance(mus.data, badmus.data), axis = 1) return np.arange(len(mus))[proximity <= tol] class GenericGreedyApproximant(GenericStandardApproximant): """ ROM greedy interpolant computation for parametric problems (ABSTRACT). Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': whether to compute POD of snapshots; defaults to True; - 'S': number of starting training points; - 'sampler': sample point generator; - 'greedyTol': uniform error tolerance for greedy algorithm; defaults to 1e-2; - 'collinearityTol': collinearity tolerance for greedy algorithm; defaults to 0.; - 'maxIter': maximum number of greedy steps; defaults to 1e2; - 'refinementRatio': ratio of test points to be exhausted before test set refinement; defaults to 0.2; - 'nTestPoints': number of test points; defaults to 5e2; - 'trainSetGenerator': training sample points generator; defaults to sampler. Defaults to empty dict. approx_state(optional): Whether to approximate state. Defaults to False. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. mu0: Default parameter. mus: Array of snapshot parameters. approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList. parameterListSoft: Recognized keys of soft approximant parameters: - 'POD': whether to compute POD of snapshots. - 'greedyTol': uniform error tolerance for greedy algorithm; - 'collinearityTol': collinearity tolerance for greedy algorithm; - 'maxIter': maximum number of greedy steps; - 'refinementRatio': ratio of test points to be exhausted before test set refinement; - 'nTestPoints': number of test points; - 'trainSetGenerator': training sample points generator. parameterListCritical: Recognized keys of critical approximant parameters: - 'S': total number of samples current approximant relies upon; - 'sampler': sample point generator. approx_state: Whether to approximate state. verbosity: Verbosity level. POD: whether to compute POD of snapshots. S: number of test points. sampler: Sample point generator. greedyTol: Uniform error tolerance for greedy algorithm. collinearityTol: Collinearity tolerance for greedy algorithm. maxIter: maximum number of greedy steps. refinementRatio: ratio of training points to be exhausted before training set refinement. nTestPoints: number of starting training points. trainSetGenerator: training sample points generator. muBounds: list of bounds for parameter values. samplingEngine: Sampling engine. estimatorNormEngine: Engine for estimator norm computation. uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as sampleList. lastSolvedHF: Parameter(s) corresponding to last computed high fidelity solution(s) as parameterList. uApproxReduced: Reduced approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApproxReduced: Parameter(s) corresponding to last computed reduced approximate solution(s) as parameterList. uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApprox: Parameter(s) corresponding to last computed approximate solution(s) as parameterList. """ def __init__(self, HFEngine:HFEng, mu0 : paramVal = None, approxParameters : DictAny = {}, approx_state : bool = False, verbosity : int = 10, timestamp : bool = True): self._preInit() self._addParametersToList(["greedyTol", "collinearityTol", "maxIter", "refinementRatio", "nTestPoints"], [1e-2, 0., 1e2, .2, 5e2], ["trainSetGenerator"], ["AUTO"]) super().__init__(HFEngine = HFEngine, mu0 = mu0, approxParameters = approxParameters, approx_state = approx_state, verbosity = verbosity, timestamp = timestamp) self._postInit() @property def greedyTol(self): """Value of greedyTol.""" return self._greedyTol @greedyTol.setter def greedyTol(self, greedyTol): if greedyTol < 0: raise RROMPyException("greedyTol must be non-negative.") if hasattr(self, "_greedyTol") and self.greedyTol is not None: greedyTolold = self.greedyTol else: greedyTolold = -1 self._greedyTol = greedyTol self._approxParameters["greedyTol"] = self.greedyTol if greedyTolold != self.greedyTol: self.resetSamples() @property def collinearityTol(self): """Value of collinearityTol.""" return self._collinearityTol @collinearityTol.setter def collinearityTol(self, collinearityTol): if collinearityTol < 0: raise RROMPyException("collinearityTol must be non-negative.") if (hasattr(self, "_collinearityTol") and self.collinearityTol is not None): collinearityTolold = self.collinearityTol else: collinearityTolold = -1 self._collinearityTol = collinearityTol self._approxParameters["collinearityTol"] = self.collinearityTol if collinearityTolold != self.collinearityTol: self.resetSamples() @property def maxIter(self): """Value of maxIter.""" return self._maxIter @maxIter.setter def maxIter(self, maxIter): if maxIter <= 0: raise RROMPyException("maxIter must be positive.") if hasattr(self, "_maxIter") and self.maxIter is not None: maxIterold = self.maxIter else: maxIterold = -1 self._maxIter = maxIter self._approxParameters["maxIter"] = self.maxIter if maxIterold != self.maxIter: self.resetSamples() @property def refinementRatio(self): """Value of refinementRatio.""" return self._refinementRatio @refinementRatio.setter def refinementRatio(self, refinementRatio): if refinementRatio <= 0. or refinementRatio > 1.: raise RROMPyException(("refinementRatio must be between 0 " "(excluded) and 1.")) if (hasattr(self, "_refinementRatio") and self.refinementRatio is not None): refinementRatioold = self.refinementRatio else: refinementRatioold = -1 self._refinementRatio = refinementRatio self._approxParameters["refinementRatio"] = self.refinementRatio if refinementRatioold != self.refinementRatio: self.resetSamples() @property def nTestPoints(self): """Value of nTestPoints.""" return self._nTestPoints @nTestPoints.setter def nTestPoints(self, nTestPoints): if nTestPoints <= 0: raise RROMPyException("nTestPoints must be positive.") if not np.isclose(nTestPoints, np.int(nTestPoints)): raise RROMPyException("nTestPoints must be an integer.") nTestPoints = np.int(nTestPoints) if hasattr(self, "_nTestPoints") and self.nTestPoints is not None: nTestPointsold = self.nTestPoints else: nTestPointsold = -1 self._nTestPoints = nTestPoints self._approxParameters["nTestPoints"] = self.nTestPoints if nTestPointsold != self.nTestPoints: self.resetSamples() @property def trainSetGenerator(self): """Value of trainSetGenerator.""" return self._trainSetGenerator @trainSetGenerator.setter def trainSetGenerator(self, trainSetGenerator): if (isinstance(trainSetGenerator, (str,)) and trainSetGenerator.upper() == "AUTO"): trainSetGenerator = self.sampler if 'generatePoints' not in dir(trainSetGenerator): raise RROMPyException("trainSetGenerator type not recognized.") if (hasattr(self, '_trainSetGenerator') and self.trainSetGenerator not in [None, "AUTO"]): trainSetGeneratorOld = self.trainSetGenerator self._trainSetGenerator = trainSetGenerator self._approxParameters["trainSetGenerator"] = self.trainSetGenerator if (not 'trainSetGeneratorOld' in locals() or trainSetGeneratorOld != self.trainSetGenerator): self.resetSamples() def resetSamples(self): """Reset samples.""" super().resetSamples() self._mus = emptyParameterList() def initEstimatorNormEngine(self, normEngn : normEng = None): """Initialize estimator norm engine.""" if (normEngn is not None or not hasattr(self, "estimatorNormEngine") or self.estimatorNormEngine is None): if normEngn is None: if not hasattr(self.HFEngine, "energyNormPartialDualMatrix"): self.HFEngine.buildEnergyNormPartialDualForm() estimatorEnergyMatrix = ( self.HFEngine.energyNormPartialDualMatrix) else: if hasattr(normEngn, "buildEnergyNormPartialDualForm"): if not hasattr(normEngn, "energyNormPartialDualMatrix"): normEngn.buildEnergyNormPartialDualForm() estimatorEnergyMatrix = ( normEngn.energyNormPartialDualMatrix) else: estimatorEnergyMatrix = normEngn self.estimatorNormEngine = normEngine(estimatorEnergyMatrix) def _affineResidualMatricesContraction(self, rb:Np2D, rA : Np2D = None) \ -> Tuple[Np1D, Np1D, Np1D]: self.assembleReducedResidualBlocks(full = rA is not None) # 'ij,jk,ik->k', resbb, radiusb, radiusb.conj() ff = np.sum(self.trainedModel.data.resbb.dot(rb) * rb.conj(), axis = 0) if rA is None: return ff # 'ijk,jkl,il->l', resAb, radiusA, radiusb.conj() Lf = np.sum(np.tensordot(self.trainedModel.data.resAb, rA, 2) * rb.conj(), axis = 0) # 'ijkl,klt,ijt->t', resAA, radiusA, radiusA.conj() LL = np.sum(np.tensordot(self.trainedModel.data.resAA, rA, 2) * rA.conj(), axis = (0, 1)) return ff, Lf, LL - def errorEstimator(self, mus:Np1D) -> Np1D: - """Standard residual-based error estimator.""" - self.setupApproxLocal() + def getErrorEstimatorAffine(self, mus:Np1D) -> Np1D: + """Standard residual estimator.""" checkIfAffine(self.HFEngine, "apply affinity-based error estimator") self.HFEngine.buildA() self.HFEngine.buildb() mus = checkParameterList(mus, self.npar)[0] - vbMng(self.trainedModel, "INIT", - "Evaluating error estimator at mu = {}.".format(mus), 10) verb = self.trainedModel.verbosity self.trainedModel.verbosity = 0 uApproxRs = self.getApproxReduced(mus) muTestEff = mus ** self.HFEngine.rescalingExp radiusA = np.empty((len(self.HFEngine.thAs), len(mus)), dtype = np.complex) radiusb = np.empty((len(self.HFEngine.thbs), len(mus)), dtype = np.complex) for j, thA in enumerate(self.HFEngine.thAs): radiusA[j] = expressionEvaluator(thA[0], muTestEff) for j, thb in enumerate(self.HFEngine.thbs): radiusb[j] = expressionEvaluator(thb[0], muTestEff) radiusA = np.expand_dims(uApproxRs.data, 1) * radiusA ff, Lf, LL = self._affineResidualMatricesContraction(radiusb, radiusA) err = np.abs((LL - 2. * np.real(Lf) + ff) / ff) ** .5 self.trainedModel.verbosity = verb - vbMng(self.trainedModel, "DEL", "Done evaluating error estimator", 10) return err - def getMaxErrorEstimator(self, mus:paramList) -> Tuple[Np1D, int, float]: - """ - Compute maximum of (and index of maximum of) error estimator over given - parameters. - """ - errorEstTest = self.errorEstimator(mus) - idxMaxEst = [np.argmax(errorEstTest)] - return errorEstTest, idxMaxEst, errorEstTest[idxMaxEst] + def errorEstimator(self, mus:Np1D, return_max : bool = False) -> Np1D: + self.setupApproxLocal() + mus = checkParameterList(mus, self.npar)[0] + vbMng(self.trainedModel, "INIT", + "Evaluating error estimator at mu = {}.".format(mus), 10) + err = self.getErrorEstimatorAffine(mus) + vbMng(self.trainedModel, "DEL", "Done evaluating error estimator", 10) + if not return_max: return err + idxMaxEst = [np.argmax(err)] + return err, idxMaxEst, err[idxMaxEst] def _isLastSampleCollinear(self) -> bool: """Check collinearity of last sample.""" if self.collinearityTol <= 0.: return False if self.POD: reff = self.samplingEngine.RPOD[:, -1] else: RROMPyWarning(("Repeated orthogonalization of the samples for " "collinearity check. Consider setting POD to " "True.")) if not hasattr(self, "_PODEngine"): from rrompy.sampling.base.pod_engine import PODEngine self._PODEngine = PODEngine(self.HFEngine) reff = self._PODEngine.generalizedQR(self.samplingEngine.samples, only_R = True, is_state = True)[:, -1] cLevel = np.abs(reff[-1]) / np.linalg.norm(reff) cLevel = np.inf if np.isclose(cLevel, 0.) else cLevel ** -1. vbMng(self, "MAIN", "Collinearity indicator {:.4e}.".format(cLevel), 5) return cLevel > self.collinearityTol def greedyNextSample(self, muidx:int, plotEst : bool = False)\ -> Tuple[Np1D, int, float, paramVal]: """Compute next greedy snapshot of solution map.""" RROMPyAssert(self._mode, message = "Cannot add greedy sample.") mus = copy(self.muTest[muidx]) self.muTest.pop(muidx) for j, mu in enumerate(mus): vbMng(self, "MAIN", ("Adding sample point no. {} at {} to training " "set.").format(len(self.mus) + 1, mu), 2) self.mus.append(mu) self._S = len(self.mus) self._approxParameters["S"] = self.S - if not np.allclose(mu, self.samplingEngine.mus.data[j - len(mus)]): + if (self.samplingEngine.nsamples <= len(mus) - j - 1 + or not np.allclose(mu, + self.samplingEngine.mus.data[j - len(mus)])): self.samplingEngine.nextSample(mu) if self._isLastSampleCollinear(): vbMng(self, "MAIN", ("Collinearity above tolerance detected. Starting " "preemptive greedy loop termination."), 2) self._collinearityFlag = 1 errorEstTest = np.empty(len(self.muTest)) errorEstTest[:] = np.nan return errorEstTest, [-1], np.nan, np.nan - errorEstTest, muidx, maxErrorEst = self.getMaxErrorEstimator( - self.muTest) + errorEstTest, muidx, maxErrorEst = self.errorEstimator(self.muTest, + True) if (plotEst and not np.any(np.isnan(errorEstTest)) and not np.any(np.isinf(errorEstTest))): musre = copy(self.muTest.re.data) from matplotlib import pyplot as plt plt.figure() errCP = copy(errorEstTest) while len(musre) > 0: if self.npar == 1: currIdx = np.arange(len(musre)) else: currIdx = np.where(np.isclose(np.sum( np.abs(musre[:, 1 :] - musre[0, 1 :]), 1), 0.))[0] plt.semilogy(musre[currIdx, 0], errCP[currIdx], 'k', linewidth = 1) musre = np.delete(musre, currIdx, 0) errCP = np.delete(errCP, currIdx) plt.semilogy([self.muTest.re(0, 0), self.muTest.re(-1, 0)], [self.greedyTol] * 2, 'r--') plt.semilogy(self.mus.re(0), 2. * self.greedyTol * np.ones(len(self.mus)), '*m') plt.semilogy(self.muTest.re(muidx, 0), maxErrorEst, 'xr') plt.grid() plt.show() plt.close() return errorEstTest, muidx, maxErrorEst, self.muTest[muidx] def _preliminaryTraining(self): """Initialize starting snapshots of solution map.""" RROMPyAssert(self._mode, message = "Cannot start greedy algorithm.") if self.samplingEngine.nsamples > 0: return self.computeScaleFactor() self.resetSamples() self.mus = self.trainSetGenerator.generatePoints(self.S)[ list(range(self.S))] muTestBase = self.sampler.generatePoints(self.nTestPoints) idxPop = pruneSamples(muTestBase ** self.HFEngine.rescalingExp, self.mus ** self.HFEngine.rescalingExp, 1e-10 * self.scaleFactor[0]) muTestBase.pop(idxPop) muTestBase = muTestBase.sort() muLast = copy(self.mus[-1]) self.mus.pop() if len(self.mus) > 0: vbMng(self, "MAIN", ("Adding first {} sample point{} at {} to training " "set.").format(self.S - 1, "" + "s" * (self.S > 2), self.mus), 2) self.samplingEngine.iterSample(self.mus) self._S = len(self.mus) self._approxParameters["S"] = self.S self.muTest = emptyParameterList() self.muTest.reset((len(muTestBase) + 1, self.mus.shape[1])) self.muTest[: -1] = muTestBase.data self.muTest[-1] = muLast.data def _enrichTestSet(self, nTest:int): """Add extra elements to test set.""" RROMPyAssert(self._mode, message = "Cannot enrich test set.") muTestExtra = self.sampler.generatePoints(2 * nTest) muTotal = copy(self.mus) muTotal.append(self.muTest) idxPop = pruneSamples(muTestExtra ** self.HFEngine.rescalingExp, muTotal ** self.HFEngine.rescalingExp, 1e-10 * self.scaleFactor[0]) muTestExtra.pop(idxPop) muTestNew = np.empty((len(self.muTest) + len(muTestExtra), self.muTest.shape[1]), dtype = np.complex) muTestNew[: len(self.muTest)] = self.muTest.data muTestNew[len(self.muTest) :] = muTestExtra.data self.muTest = checkParameterList(muTestNew, self.npar)[0].sort() vbMng(self, "MAIN", "Enriching test set by {} elements.".format(len(muTestExtra)), 5) def setupApprox(self, plotEst : bool = False): """Compute greedy snapshots of solution map.""" if self.checkComputedApprox(): return RROMPyAssert(self._mode, message = "Cannot start greedy algorithm.") vbMng(self, "INIT", "Starting computation of snapshots.", 2) self._preliminaryTraining() nTest = self.nTestPoints muT0 = copy(self.muTest[-1]) errorEstTest, muidx, maxErrorEst, mu = self.greedyNextSample( [len(self.muTest) - 1], plotEst) self._collinearityFlag = 0 if maxErrorEst is not None and np.any(np.isnan(maxErrorEst)): RROMPyWarning(("Instability in a posteriori estimator. " "Starting preemptive greedy loop termination.")) self.muTest.append(muT0) self.mus.pop(-1) self.samplingEngine.popSample() self.setupApproxLocal() else: if maxErrorEst is not None: max2ErrorEst = np.max(maxErrorEst) vbMng(self, "MAIN", ("Uniform testing error estimate " "{:.4e}.").format(max2ErrorEst), 2) trainedModelOld = copy(self.trainedModel) while (self.samplingEngine.nsamples < self.maxIter and (maxErrorEst is None or max2ErrorEst > self.greedyTol)): if (1. - self.refinementRatio) * nTest > len(self.muTest): self._enrichTestSet(nTest) nTest = len(self.muTest) muTestOld = self.muTest errorEstTest, muidx, maxErrorEst, mu = self.greedyNextSample( muidx, plotEst) if maxErrorEst is not None and (np.any(np.isnan(maxErrorEst)) or np.any(np.isinf(maxErrorEst))): if self._collinearityFlag == 0: RROMPyWarning(("Instability in a posteriori " "estimator. Starting preemptive greedy " "loop termination.")) self.muTest = muTestOld self._approxParameters = ( trainedModelOld.data.approxParameters) self._S = trainedModelOld.data.approxParameters["S"] self._approxParameters["S"] = self.S self.trainedModel.data = copy(trainedModelOld.data) break if maxErrorEst is not None: max2ErrorEst = np.max(maxErrorEst) vbMng(self, "MAIN", ("Uniform testing error estimate " "{:.4e}.").format(max2ErrorEst), 2) trainedModelOld.data = copy(self.trainedModel.data) while len(self.mus) > self.S: self.mus.pop(-1) while self.samplingEngine.nsamples > self.S: self.samplingEngine.popSample() vbMng(self, "DEL", ("Done computing snapshots (final snapshot count: " "{}).").format(self.samplingEngine.nsamples), 2) def checkComputedApprox(self) -> bool: """ Check if setup of new approximant is not needed. Returns: True if new setup is not needed. False otherwise. """ return (super().checkComputedApprox() and len(self.mus) == self.trainedModel.data.projMat.shape[1]) def assembleReducedResidualGramian(self, pMat:sampList): """ Build residual gramian of reduced linear system through projections. """ self.initEstimatorNormEngine() if (not hasattr(self.trainedModel.data, "gramian") or self.trainedModel.data.gramian is None): gramian = self.estimatorNormEngine.innerProduct(pMat, pMat) else: Sold = self.trainedModel.data.gramian.shape[0] S = len(self.mus) if Sold > S: gramian = self.trainedModel.data.gramian[: S, : S] else: idxOld = list(range(Sold)) idxNew = list(range(Sold, S)) gramian = np.empty((S, S), dtype = np.complex) gramian[: Sold, : Sold] = self.trainedModel.data.gramian gramian[: Sold, Sold :] = ( self.estimatorNormEngine.innerProduct(pMat(idxNew), pMat(idxOld))) gramian[Sold :, : Sold] = gramian[: Sold, Sold :].T.conj() gramian[Sold :, Sold :] = ( self.estimatorNormEngine.innerProduct(pMat(idxNew), pMat(idxNew))) self.trainedModel.data.gramian = gramian def assembleReducedResidualBlocksbb(self, bs:List[Np1D]): """ Build blocks (of type bb) of reduced linear system through projections. """ self.initEstimatorNormEngine() nbs = len(bs) if (not hasattr(self.trainedModel.data, "resbb") or self.trainedModel.data.resbb is None): resbb = np.empty((nbs, nbs), dtype = np.complex) for i in range(nbs): Mbi = bs[i] resbb[i, i] = self.estimatorNormEngine.innerProduct(Mbi, Mbi) for j in range(i): Mbj = bs[j] resbb[i, j] = self.estimatorNormEngine.innerProduct(Mbj, Mbi) for i in range(nbs): for j in range(i + 1, nbs): resbb[i, j] = resbb[j, i].conj() self.trainedModel.data.resbb = resbb def assembleReducedResidualBlocksAb(self, As:List[Np2D], bs:List[Np1D], pMat:sampList): """ Build blocks (of type Ab) of reduced linear system through projections. """ self.initEstimatorNormEngine() nAs = len(As) nbs = len(bs) S = len(self.mus) if (not hasattr(self.trainedModel.data, "resAb") or self.trainedModel.data.resAb is None): if not isinstance(pMat, (np.ndarray,)): pMat = pMat.data resAb = np.empty((nbs, S, nAs), dtype = np.complex) for j in range(nAs): MAj = dot(As[j], pMat) for i in range(nbs): Mbi = bs[i] resAb[i, :, j] = self.estimatorNormEngine.innerProduct(MAj, Mbi) else: Sold = self.trainedModel.data.resAb.shape[1] if Sold == S: return if Sold > S: resAb = self.trainedModel.data.resAb[:, : S, :] else: if not isinstance(pMat, (np.ndarray,)): pMat = pMat.data resAb = np.empty((nbs, S, nAs), dtype = np.complex) resAb[:, : Sold, :] = self.trainedModel.data.resAb for j in range(nAs): MAj = dot(As[j], pMat[:, Sold :]) for i in range(nbs): Mbi = bs[i] resAb[i, Sold :, j] = ( self.estimatorNormEngine.innerProduct(MAj, Mbi)) self.trainedModel.data.resAb = resAb def assembleReducedResidualBlocksAA(self, As:List[Np2D], pMat:sampList): """ Build blocks (of type AA) of reduced linear system through projections. """ self.initEstimatorNormEngine() nAs = len(As) S = len(self.mus) if (not hasattr(self.trainedModel.data, "resAA") or self.trainedModel.data.resAA is None): if not isinstance(pMat, (np.ndarray,)): pMat = pMat.data resAA = np.empty((S, nAs, S, nAs), dtype = np.complex) for i in range(nAs): MAi = dot(As[i], pMat) resAA[:, i, :, i] = ( self.estimatorNormEngine.innerProduct(MAi, MAi)) for j in range(i): MAj = dot(As[j], pMat) resAA[:, i, :, j] = ( self.estimatorNormEngine.innerProduct(MAj, MAi)) for i in range(nAs): for j in range(i + 1, nAs): resAA[:, i, :, j] = resAA[:, j, :, i].T.conj() else: Sold = self.trainedModel.data.resAA.shape[0] if Sold == S: return if Sold > S: resAA = self.trainedModel.data.resAA[: S, :, : S, :] else: if not isinstance(pMat, (np.ndarray,)): pMat = pMat.data resAA = np.empty((S, nAs, S, nAs), dtype = np.complex) resAA[: Sold, :, : Sold, :] = self.trainedModel.data.resAA for i in range(nAs): MAi = dot(As[i], pMat) resAA[: Sold, i, Sold :, i] = ( self.estimatorNormEngine.innerProduct(MAi[:, Sold :], MAi[:, : Sold])) resAA[Sold :, i, : Sold, i] = resAA[: Sold, i, Sold :, i].T.conj() resAA[Sold :, i, Sold :, i] = ( self.estimatorNormEngine.innerProduct(MAi[:, Sold :], MAi[:, Sold :])) for j in range(i): MAj = dot(As[j], pMat) resAA[: Sold, i, Sold :, j] = ( self.estimatorNormEngine.innerProduct(MAj[:, Sold :], MAi[:, : Sold])) resAA[Sold :, i, : Sold, j] = ( self.estimatorNormEngine.innerProduct(MAj[:, : Sold], MAi[:, Sold :])) resAA[Sold :, i, Sold :, j] = ( self.estimatorNormEngine.innerProduct(MAj[:, Sold :], MAi[:, Sold :])) for i in range(nAs): for j in range(i + 1, nAs): resAA[: Sold, i, Sold :, j] = ( resAA[Sold :, j, : Sold, i].T.conj()) resAA[Sold :, i, : Sold, j] = ( resAA[: Sold, j, Sold :, i].T.conj()) resAA[Sold :, i, Sold :, j] = ( resAA[Sold :, j, Sold :, i].T.conj()) self.trainedModel.data.resAA = resAA def assembleReducedResidualBlocks(self, full : bool = False): """Build affine blocks of affine decomposition of residual.""" if full: checkIfAffine(self.HFEngine, "assemble reduced residual blocks") else: checkIfAffine(self.HFEngine, "assemble reduced RHS blocks", True) self.HFEngine.buildb() self.assembleReducedResidualBlocksbb(self.HFEngine.bs) if full: pMat = self.samplingEngine.samples self.HFEngine.buildA() self.assembleReducedResidualBlocksAb(self.HFEngine.As, self.HFEngine.bs, pMat) self.assembleReducedResidualBlocksAA(self.HFEngine.As, pMat) diff --git a/rrompy/reduction_methods/greedy/rational_interpolant_greedy.py b/rrompy/reduction_methods/greedy/rational_interpolant_greedy.py index 54c2a90..f3acc23 100644 --- a/rrompy/reduction_methods/greedy/rational_interpolant_greedy.py +++ b/rrompy/reduction_methods/greedy/rational_interpolant_greedy.py @@ -1,462 +1,518 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from copy import deepcopy as copy import numpy as np from rrompy.hfengines.base.linear_affine_engine import checkIfAffine from .generic_greedy_approximant import GenericGreedyApproximant from rrompy.utilities.poly_fitting.polynomial import (polybases, PolynomialInterpolator as PI, polyvanderTotal as pvT) from rrompy.utilities.numerical import totalDegreeN, dot from rrompy.utilities.expression import expressionEvaluator from rrompy.reduction_methods.standard import RationalInterpolant from rrompy.utilities.base.types import (Np1D, Tuple, DictAny, HFEng, paramVal, - paramList) + List) from rrompy.utilities.base import verbosityManager as vbMng from rrompy.utilities.poly_fitting import customFit from rrompy.utilities.exception_manager import (RROMPyWarning, RROMPyException, RROMPyAssert, RROMPy_FRAGILE) from rrompy.parameter import checkParameterList from rrompy.sampling import emptySampleList __all__ = ['RationalInterpolantGreedy'] class RationalInterpolantGreedy(GenericGreedyApproximant, RationalInterpolant): """ ROM greedy rational interpolant computation for parametric problems. Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': whether to compute POD of snapshots; defaults to True; - 'S': number of starting training points; - 'sampler': sample point generator; - 'greedyTol': uniform error tolerance for greedy algorithm; defaults to 1e-2; - 'collinearityTol': collinearity tolerance for greedy algorithm; defaults to 0.; - 'maxIter': maximum number of greedy steps; defaults to 1e2; - 'refinementRatio': ratio of test points to be exhausted before test set refinement; defaults to 0.2; - 'nTestPoints': number of test points; defaults to 5e2; - 'trainSetGenerator': training sample points generator; defaults to sampler; - 'polybasis': type of basis for interpolation; defaults to 'MONOMIAL'; - 'errorEstimatorKind': kind of error estimator; available values include 'AFFINE', 'DISCREPANCY', 'INTERPOLATORY', - 'EIM_INTERPOLATORY', and 'EIM_DIAGONAL'; defaults to - 'DISCREPANCY'; + 'LOOK_AHEAD', and 'NONE'; defaults to 'NONE'; - 'interpRcond': tolerance for interpolation; defaults to None; - 'robustTol': tolerance for robust rational denominator management; defaults to 0. Defaults to empty dict. approx_state(optional): Whether to approximate state. Defaults and must be True. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. mu0: Default parameter. mus: Array of snapshot parameters. approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList. parameterListSoft: Recognized keys of soft approximant parameters: - 'POD': whether to compute POD of snapshots. - 'greedyTol': uniform error tolerance for greedy algorithm; - 'collinearityTol': collinearity tolerance for greedy algorithm; - 'maxIter': maximum number of greedy steps; - 'refinementRatio': ratio of test points to be exhausted before test set refinement; - 'nTestPoints': number of test points; - 'trainSetGenerator': training sample points generator; - 'errorEstimatorKind': kind of error estimator; - 'interpRcond': tolerance for interpolation; - 'robustTol': tolerance for robust rational denominator management. parameterListCritical: Recognized keys of critical approximant parameters: - 'S': total number of samples current approximant relies upon; - 'sampler': sample point generator. approx_state: Whether to approximate state. verbosity: Verbosity level. POD: whether to compute POD of snapshots. S: number of test points. sampler: Sample point generator. greedyTol: uniform error tolerance for greedy algorithm. collinearityTol: Collinearity tolerance for greedy algorithm. maxIter: maximum number of greedy steps. refinementRatio: ratio of training points to be exhausted before training set refinement. nTestPoints: number of starting training points. trainSetGenerator: training sample points generator. robustTol: tolerance for robust rational denominator management. errorEstimatorKind: kind of error estimator. interpRcond: tolerance for interpolation. robustTol: tolerance for robust rational denominator management. muBounds: list of bounds for parameter values. samplingEngine: Sampling engine. estimatorNormEngine: Engine for estimator norm computation. uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as sampleList. lastSolvedHF: Parameter(s) corresponding to last computed high fidelity solution(s) as parameterList. uApproxReduced: Reduced approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApproxReduced: Parameter(s) corresponding to last computed reduced approximate solution(s) as parameterList. uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApprox: Parameter(s) corresponding to last computed approximate solution(s) as parameterList. """ _allowedEstimatorKinds = ["AFFINE", "DISCREPANCY", "INTERPOLATORY", "LOOK_AHEAD", "NONE"] def __init__(self, HFEngine:HFEng, mu0 : paramVal = None, approxParameters : DictAny = {}, approx_state : bool = True, verbosity : int = 10, timestamp : bool = True): if not approx_state: RROMPyWarning("Overriding approx_state to True.") self._preInit() self._addParametersToList(["errorEstimatorKind"], ["DISCREPANCY"], toBeExcluded = ["M", "N", "polydegreetype", "radialDirectionalWeights", "nNearestNeighbor"]) super().__init__(HFEngine = HFEngine, mu0 = mu0, approxParameters = approxParameters, approx_state = True, verbosity = verbosity, timestamp = timestamp) self._postInit() @property def E(self): """Value of E.""" self._E = self.sampleBatchIdx - 1 return self._E @E.setter def E(self, E): RROMPyWarning(("E is used just to simplify inheritance, and its value " "cannot be changed from that of sampleBatchIdx - 1.")) @property def polydegreetype(self): """Value of polydegreetype.""" return "TOTAL" @polydegreetype.setter def polydegreetype(self, polydegreetype): RROMPyWarning(("polydegreetype is used just to simplify inheritance, " "and its value cannot be changed from 'TOTAL'.")) @property def polybasis(self): """Value of polybasis.""" return self._polybasis @polybasis.setter def polybasis(self, polybasis): try: polybasis = polybasis.upper().strip().replace(" ","") if polybasis not in polybases: raise RROMPyException("Sample type not recognized.") self._polybasis = polybasis except: RROMPyWarning(("Prescribed polybasis not recognized. Overriding " "to 'MONOMIAL'.")) self._polybasis = "MONOMIAL" self._approxParameters["polybasis"] = self.polybasis @property def errorEstimatorKind(self): """Value of errorEstimatorKind.""" return self._errorEstimatorKind @errorEstimatorKind.setter def errorEstimatorKind(self, errorEstimatorKind): errorEstimatorKind = errorEstimatorKind.upper() if errorEstimatorKind not in self._allowedEstimatorKinds: RROMPyWarning(("Error estimator kind not recognized. Overriding " - "to 'DISCREPANCY'.")) - errorEstimatorKind = "DISCREPANCY" + "to 'NONE'.")) + errorEstimatorKind = "NONE" self._errorEstimatorKind = errorEstimatorKind self._approxParameters["errorEstimatorKind"] = self.errorEstimatorKind - def errorEstimator(self, mus:Np1D) -> Np1D: - """Standard residual-based error estimator.""" - if self.errorEstimatorKind == "AFFINE": - return super().errorEstimator(mus) - setupOK = self.setupApproxLocal() - if not setupOK: - err = np.empty(len(mus)) - err[:] = np.nan - return err - self._setupInterpolationIndices() + def _polyvanderAuxiliary(self, mus, deg, *args): + return pvT(mus, deg, *args) + + def getErrorEstimatorDiscrepancy(self, mus:Np1D) -> Np1D: + """Discrepancy-based residual estimator.""" mus = checkParameterList(mus, self.npar)[0] muCTest = self.trainedModel.centerNormalize(mus) - vbMng(self.trainedModel, "INIT", - "Evaluating error estimator at mu = {}.".format(mus), 10) verb = self.trainedModel.verbosity self.trainedModel.verbosity = 0 QTest = self.trainedModel.getQVal(mus) - if self.errorEstimatorKind == "DISCREPANCY": - checkIfAffine(self.HFEngine, - "apply discrepancy-based error estimator") - self.HFEngine.buildA() - self.HFEngine.buildb() - nAs, nbs = self.HFEngine.nAs, self.HFEngine.nbs - muTrainEff = self.mus ** self.HFEngine.rescalingExp - muTestEff = mus ** self.HFEngine.rescalingExp - PTrain = self.trainedModel.getPVal(self.mus).data.T - QTrain = self.trainedModel.getQVal(self.mus) - PTest = self.trainedModel.getPVal(mus).data - radiusAbTrain = np.empty((self.S, nAs * self.S + nbs), - dtype = np.complex) - radiusA = np.empty((self.S, nAs, len(mus)), dtype = np.complex) - radiusb = np.empty((nbs, len(mus)), dtype = np.complex) - for j, thA in enumerate(self.HFEngine.thAs): - idxs = j * self.S + np.arange(self.S) - radiusAbTrain[:, idxs] = expressionEvaluator(thA[0], - muTrainEff, (self.S, 1)) * PTrain - radiusA[:, j] = PTest * expressionEvaluator(thA[0], - muTestEff, (len(mus),)) - for j, thb in enumerate(self.HFEngine.thbs): - idx = nAs * self.S + j - radiusAbTrain[:, idx] = QTrain * expressionEvaluator(thb[0], + checkIfAffine(self.HFEngine, "apply discrepancy-based error estimator") + self.HFEngine.buildA() + self.HFEngine.buildb() + nAs, nbs = self.HFEngine.nAs, self.HFEngine.nbs + muTrainEff = self.mus ** self.HFEngine.rescalingExp + muTestEff = mus ** self.HFEngine.rescalingExp + PTrain = self.trainedModel.getPVal(self.mus).data.T + QTrain = self.trainedModel.getQVal(self.mus) + PTest = self.trainedModel.getPVal(mus).data + radiusAbTrain = np.empty((self.S, nAs * self.S + nbs), + dtype = np.complex) + radiusA = np.empty((self.S, nAs, len(mus)), dtype = np.complex) + radiusb = np.empty((nbs, len(mus)), dtype = np.complex) + for j, thA in enumerate(self.HFEngine.thAs): + idxs = j * self.S + np.arange(self.S) + radiusAbTrain[:, idxs] = expressionEvaluator(thA[0], muTrainEff, + (self.S, 1)) * PTrain + radiusA[:, j] = PTest * expressionEvaluator(thA[0], muTestEff, + (len(mus),)) + for j, thb in enumerate(self.HFEngine.thbs): + idx = nAs * self.S + j + radiusAbTrain[:, idx] = QTrain * expressionEvaluator(thb[0], muTrainEff, (self.S,)) - radiusb[j] = QTest * expressionEvaluator(thb[0], muTestEff, - (len(mus),)) - QRHSNorm2 = self._affineResidualMatricesContraction(radiusb) - vanTrain = pvT(self._musUniqueCN, self.E, self.polybasis0, - self._derIdxs, self._reorder) - interpPQ = customFit(vanTrain, radiusAbTrain, - rcond = self.interpRcond) - vanTest = pvT(muCTest, self.E, self.polybasis0) - DradiusAb = vanTest.dot(interpPQ) - radiusA = (radiusA - - DradiusAb[:, : - nbs].reshape(len(mus), -1, self.S).T) - radiusb = radiusb - DradiusAb[:, - nbs :].T - ff, Lf, LL = self._affineResidualMatricesContraction(radiusb, - radiusA) - err = np.abs((LL - 2. * np.real(Lf) + ff) / QRHSNorm2) ** .5 - else: #if self.errorEstimatorKind in ["INTERPOLATORY", "LOOK_AHEAD", "NONE"]: - QTest = np.abs(QTest) - muCTest = self.trainedModel.centerNormalize(mus) - muCTrain = self.trainedModel.centerNormalize(self.mus) - vanTest = pvT(muCTest, self.E, self.polybasis) - vanTestNext = pvT(muCTest, self.E + 1, self.polybasis)[:, + radiusb[j] = QTest * expressionEvaluator(thb[0], muTestEff, + (len(mus),)) + QRHSNorm2 = self._affineResidualMatricesContraction(radiusb) + vanTrain = self._polyvanderAuxiliary(self._musUniqueCN, self.E, + self.polybasis0, self._derIdxs, + self._reorder) + interpPQ = customFit(vanTrain, radiusAbTrain, + rcond = self.interpRcond) + vanTest = self._polyvanderAuxiliary(muCTest, self.E, + self.polybasis0) + DradiusAb = vanTest.dot(interpPQ) + radiusA = (radiusA + - DradiusAb[:, : - nbs].reshape(len(mus), -1, self.S).T) + radiusb = radiusb - DradiusAb[:, - nbs :].T + ff, Lf, LL = self._affineResidualMatricesContraction(radiusb, radiusA) + err = np.abs((LL - 2. * np.real(Lf) + ff) / QRHSNorm2) ** .5 + self.trainedModel.verbosity = verb + return err + + def getErrorEstimatorInterpolatory(self, mus:Np1D) -> Np1D: + """Interpolation-based residual estimator.""" + errTest, QTest, idxMaxEst = self._EIMStep(mus) + self.initEstimatorNormEngine() + mu_muTestSample = mus[idxMaxEst] + app_muTestSample = self.getApproxReduced(mu_muTestSample) + if self._mode == RROMPy_FRAGILE: + if not self.HFEngine.isCEye: + raise RROMPyException(("Cannot compute INTERPOLATORY residual " + "estimator in fragile mode for " + "non-scalar C.")) + app_muTestSample = dot(self.trainedModel.data.projMat, + app_muTestSample.data) + else: + app_muTestSample = dot(self.samplingEngine.samples, + app_muTestSample) + resmu = self.HFEngine.residual(mu_muTestSample, app_muTestSample, + post_c = False) + RHSmu = self.HFEngine.residual(mu_muTestSample, None, post_c = False) + ressamples = (self.estimatorNormEngine.norm(resmu) + / self.estimatorNormEngine.norm(RHSmu)) + # improve the following by explicitly constructing + # (tensorised) interpolant as in while loop + musT = copy(self.mus) + musT.append(mu_muTestSample) + musT = self.trainedModel.centerNormalize(musT) + musC = self.trainedModel.centerNormalize(mus) + resT = np.zeros(len(musT), dtype = np.complex) + err = np.zeros(len(mus)) + for l in range(len(mu_muTestSample)): + resT[len(self.mus) + l] = ressamples[l] * QTest[idxMaxEst[l]] + p = PI() + wellCond, msg = p.setupByInterpolation(musT, resT, self.E + 1, + self.polybasis, self.verbosity >= 15, + True, {}, {"rcond": self.interpRcond}) + err += np.abs(p(musC)) + resT[len(self.mus) + l] = 0. + err /= QTest + vbMng(self, "MAIN", msg, 15) + return err + + def getErrorEstimatorLookAhead(self, mus:Np1D) -> Tuple[Np1D, List[int]]: + """Residual estimator based on look-ahead idea.""" + errTest, QTest, idxMaxEst = self._EIMStep(mus) + self.initEstimatorNormEngine() + mu_muTestSample = mus[idxMaxEst] + app_muTestSample = self.getApproxReduced(mu_muTestSample) + if self._mode == RROMPy_FRAGILE: + app_muTestSample = dot(self.trainedModel.data.projMat, + app_muTestSample.data) + else: + app_muTestSample = dot(self.samplingEngine.samples, + app_muTestSample) + for j, mu in enumerate(mu_muTestSample): + self.samplingEngine.nextSample(mu) + if hasattr(self.samplingEngine, "samples_full"): + uEx = self.samplingEngine.samples_full[-1] + else: + uEx = self.samplingEngine.samples[-1] + if j == 0: + RHSmu = emptySampleList() + RHSmu.reset((len(uEx), len(mu_muTestSample)), + dtype = uEx.dtype) + RHSmu[j] = uEx + resmu = RHSmu - app_muTestSample + ressamples = (self.estimatorNormEngine.norm(resmu) + / self.estimatorNormEngine.norm(RHSmu)) + # improve the following by explicitly constructing + # (tensorised) interpolant as in while loop + musT = copy(self.mus) + musT.append(mu_muTestSample) + musT = self.trainedModel.centerNormalize(musT) + musC = self.trainedModel.centerNormalize(mus) + resT = np.zeros(len(musT), dtype = np.complex) + err = np.zeros(len(mus)) + for l in range(len(mu_muTestSample)): + resT[len(self.mus) + l] = ressamples[l] * QTest[idxMaxEst[l]] + p = PI() + wellCond, msg = p.setupByInterpolation(musT, resT, self.E + 1, + self.polybasis, self.verbosity >= 15, + True, {}, {"rcond": self.interpRcond}) + err += np.abs(p(musC)) + resT[len(self.mus) + l] = 0. + err /= QTest + vbMng(self, "MAIN", msg, 15) + return err, idxMaxEst + + def getErrorEstimatorNone(self, mus:Np1D) -> Np1D: + """EIM-based residual estimator.""" + err = np.max(self._EIMStep(mus, True), axis = 1) + err *= self.greedyTol / np.mean(err) + return err + + def _EIMStep(self, mus:Np1D, + only_one : bool = False) -> Tuple[Np1D, Np1D, List[int]]: + """Residual estimator based on look-ahead idea.""" + mus = checkParameterList(mus, self.npar)[0] + muCTest = self.trainedModel.centerNormalize(mus) + verb = self.trainedModel.verbosity + self.trainedModel.verbosity = 0 + QTest = self.trainedModel.getQVal(mus) + QTest = np.abs(QTest) + muCTest = self.trainedModel.centerNormalize(mus) + muCTrain = self.trainedModel.centerNormalize(self.mus) + vanTest = self._polyvanderAuxiliary(muCTest, self.E, self.polybasis) + vanTestNext = self._polyvanderAuxiliary(muCTest, self.E + 1, + self.polybasis)[:, vanTest.shape[1] :] - idxsTest = np.arange(vanTestNext.shape[1]) - basis = np.zeros((len(idxsTest), 0), dtype = float) - idxMaxEst = [] - while len(idxsTest) > 0: - vanTrial = pvT(muCTrain, self.E, self.polybasis) - vanTrialNext = pvT(muCTrain, self.E + 1, self.polybasis)[:, + idxsTest = np.arange(vanTestNext.shape[1]) + basis = np.zeros((len(idxsTest), 0), dtype = float) + idxMaxEst = [] + while len(idxsTest) > 0: + vanTrial = self._polyvanderAuxiliary(muCTrain, self.E, + self.polybasis) + vanTrialNext = self._polyvanderAuxiliary(muCTrain, self.E + 1, + self.polybasis)[:, vanTrial.shape[1] :] - vanTrial = np.hstack((vanTrial, - vanTrialNext.dot(basis).reshape( + vanTrial = np.hstack((vanTrial, vanTrialNext.dot(basis).reshape( len(vanTrialNext), basis.shape[1]))) - valuesTrial = vanTrialNext[:, idxsTest] - vanTestEff = np.hstack((vanTest, - vanTestNext.dot(basis).reshape( + valuesTrial = vanTrialNext[:, idxsTest] + vanTestEff = np.hstack((vanTest, vanTestNext.dot(basis).reshape( len(vanTestNext), basis.shape[1]))) - vanTestNextEff = vanTestNext[:, idxsTest] - coeffTest = np.linalg.solve(vanTrial, valuesTrial) - errTest = (np.abs(vanTestNextEff - vanTestEff.dot(coeffTest)) - / np.expand_dims(QTest, 1)) - if self.errorEstimatorKind == "NONE": break - idxMaxErr = np.unravel_index(np.argmax(errTest), errTest.shape) - idxMaxEst += [idxMaxErr[0]] - muCTrain.append(muCTest[idxMaxErr[0]]) - basis = np.pad(basis, [(0, 0), (0, 1)], "constant") - basis[idxsTest[idxMaxErr[1]], -1] = 1. - idxsTest = np.delete(idxsTest, idxMaxErr[1]) - if self.errorEstimatorKind in ["INTERPOLATORY", "LOOK_AHEAD"]: - self.initEstimatorNormEngine() - mu_muTestSample = mus[idxMaxEst] - app_muTestSample = self.getApproxReduced(mu_muTestSample) - if self._mode == RROMPy_FRAGILE: - if (self.errorEstimatorKind == "INTERPOLATORY" - and not self.HFEngine.isCEye): - raise RROMPyException(("Cannot compute INTERPOLATORY " - "residual estimator in fragile " - "mode for non-scalar C.")) - app_muTestSample = dot(self.trainedModel.data.projMat, - app_muTestSample.data) - else: - app_muTestSample = dot(self.samplingEngine.samples, - app_muTestSample) - if self.errorEstimatorKind == "INTERPOLATORY": - resmu = self.HFEngine.residual(mu_muTestSample, - app_muTestSample, - post_c = False) - RHSmu = self.HFEngine.residual(mu_muTestSample, None, - post_c = False) - else: #if self.errorEstimatorKind == "LOOK_AHEAD": - for j, mu in enumerate(mu_muTestSample): - self.samplingEngine.nextSample(mu) - if hasattr(self.samplingEngine, "samples_full"): - uEx = self.samplingEngine.samples_full[-1] - else: - uEx = self.samplingEngine.samples[-1] - if j == 0: - RHSmu = emptySampleList() - RHSmu.reset((len(uEx), len(mu_muTestSample)), - dtype = uEx.dtype) - RHSmu[j] = uEx - resmu = RHSmu - app_muTestSample - ressamples = (self.estimatorNormEngine.norm(resmu) - / self.estimatorNormEngine.norm(RHSmu)) - # improve the following by explicitly constructing (tensorised) - # interpolant as in while loop - musT = copy(self.mus) - musT.append(mu_muTestSample) - musT = self.trainedModel.centerNormalize(musT) - musC = self.trainedModel.centerNormalize(mus) - resT = np.zeros(len(musT), dtype = np.complex) - err = np.zeros(len(mus)) - for l in range(len(mu_muTestSample)): - resT[len(self.mus) + l] = (ressamples[l] - * QTest[idxMaxEst[l]]) - p = PI() - wellCond, msg = p.setupByInterpolation(musT, resT, - self.E + 1, self.polybasis, - self.verbosity >= 15, True, {}, - {"rcond": self.interpRcond}) - err += np.abs(p(musC)) - resT[len(self.mus) + l] = 0. - err /= QTest - vbMng(self, "MAIN", msg, 15) - else: #if self.errorEstimatorKind == "NONE": - err = np.max(errTest, axis = 1) - err *= self.greedyTol / np.mean(err) + vanTestNextEff = vanTestNext[:, idxsTest] + coeffTest = np.linalg.solve(vanTrial, valuesTrial) + errTest = (np.abs(vanTestNextEff - vanTestEff.dot(coeffTest)) + / np.expand_dims(QTest, 1)) + if only_one: return errTest + idxMaxErr = np.unravel_index(np.argmax(errTest), errTest.shape) + idxMaxEst += [idxMaxErr[0]] + muCTrain.append(muCTest[idxMaxErr[0]]) + basis = np.pad(basis, [(0, 0), (0, 1)], "constant") + basis[idxsTest[idxMaxErr[1]], -1] = 1. + idxsTest = np.delete(idxsTest, idxMaxErr[1]) self.trainedModel.verbosity = verb + return errTest, QTest, idxMaxEst + + def errorEstimator(self, mus:Np1D, return_max : bool = False) -> Np1D: + """Standard residual-based error estimator.""" + setupOK = self.setupApproxLocal() + if not setupOK: + err = np.empty(len(mus)) + err[:] = np.nan + if not return_max: return err + return err, 0, np.nan + mus = checkParameterList(mus, self.npar)[0] + vbMng(self.trainedModel, "INIT", + "Evaluating error estimator at mu = {}.".format(mus), 10) + if self.errorEstimatorKind == "AFFINE": + err = self.getErrorEstimatorAffine(mus) + else: + self._setupInterpolationIndices() + if self.errorEstimatorKind == "DISCREPANCY": + err = self.getErrorEstimatorDiscrepancy(mus) + elif self.errorEstimatorKind == "INTERPOLATORY": + err = self.getErrorEstimatorInterpolatory(mus) + elif self.errorEstimatorKind == "LOOK_AHEAD": + err, idxMaxEst = self.getErrorEstimatorLookAhead(mus) + else: #if self.errorEstimatorKind == "NONE": + err = self.getErrorEstimatorNone(mus) vbMng(self.trainedModel, "DEL", "Done evaluating error estimator", 10) - return err - - def getMaxErrorEstimator(self, mus:paramList) -> Tuple[Np1D, int, float]: - """ - Compute maximum of (and index of maximum of) error estimator over given - parameters. - """ - errorEstTest = self.errorEstimator(mus) - idxMaxEst = np.empty(self.sampleBatchSize, dtype = int) - errCP = copy(errorEstTest) - for j in range(self.sampleBatchSize): - k = np.argmax(errCP) - idxMaxEst[j] = k - if j + 1 < self.sampleBatchSize: - musZero = self.trainedModel.centerNormalize(mus, mus[k]) - errCP *= np.linalg.norm(musZero.data, axis = 1) - maxEst = errorEstTest[idxMaxEst] - return errorEstTest, idxMaxEst, maxEst + if not return_max: return err + if self.errorEstimatorKind != "LOOK_AHEAD": + idxMaxEst = np.empty(self.sampleBatchSize, dtype = int) + errCP = copy(err) + for j in range(self.sampleBatchSize): + k = np.argmax(errCP) + idxMaxEst[j] = k + if j + 1 < self.sampleBatchSize: + musZero = self.trainedModel.centerNormalize(mus, mus[k]) + errCP *= np.linalg.norm(musZero.data, axis = 1) + return err, idxMaxEst, err[idxMaxEst] def greedyNextSample(self, muidx:int, plotEst : bool = False)\ -> Tuple[Np1D, int, float, paramVal]: """Compute next greedy snapshot of solution map.""" RROMPyAssert(self._mode, message = "Cannot add greedy sample.") self.sampleBatchIdx += 1 self.sampleBatchSize = totalDegreeN(self.npar - 1, self.sampleBatchIdx) err, muidx, maxErr, muNext = super().greedyNextSample(muidx, plotEst) if maxErr is not None and (np.any(np.isnan(maxErr)) or np.any(np.isinf(maxErr))): self.sampleBatchIdx -= 1 self.sampleBatchSize = totalDegreeN(self.npar - 1, self.sampleBatchIdx) if (self.errorEstimatorKind == "NONE" and not np.isnan(maxErr) and not np.isinf(maxErr)): maxErr = None return err, muidx, maxErr, muNext def _preliminaryTraining(self): """Initialize starting snapshots of solution map.""" RROMPyAssert(self._mode, message = "Cannot start greedy algorithm.") if self.samplingEngine.nsamples > 0: return S = self.S self.sampleBatchIdx, self.sampleBatchSize, self._S = -1, 0, 0 nextBatchSize = 1 while self._S + nextBatchSize <= S: self.sampleBatchIdx += 1 self.sampleBatchSize = nextBatchSize self._S += self.sampleBatchSize nextBatchSize = totalDegreeN(self.npar - 1, self.sampleBatchIdx + 1) super()._preliminaryTraining() def setupApproxLocal(self): """ Compute rational interpolant. SVD-based robust eigenvalue management. """ if self.checkComputedApprox(): return True RROMPyAssert(self._mode, message = "Cannot setup approximant.") - vbMng(self, "INIT", "Setting up {}.".format(self.name()), 5) + self.verbosity -= 10 + vbMng(self, "INIT", "Setting up local approximant.", 5) self._N, self._M = [self.E] * 2 pMat = self.samplingEngine.samples.data pMatEff = dot(self.HFEngine.C, pMat) if self.trainedModel is None: self.trainedModel = self.tModelType() self.trainedModel.verbosity = self.verbosity self.trainedModel.timestamp = self.timestamp datadict = {"mu0": self.mu0, "projMat": pMatEff, "scaleFactor": self.scaleFactor, "rescalingExp": self.HFEngine.rescalingExp} self.trainedModel.data = self.initializeModelData(datadict)[0] else: self.trainedModel = self.trainedModel self.trainedModel.data.projMat = copy(pMatEff) self.trainedModel.data.mus = copy(self.mus) self.trainedModel.data.mus = copy(self.mus) self.catchInstability = True if self.N > 0: try: Q = self._setupDenominator()[0] except RROMPyException as RE: RROMPyWarning(RE._msg) - vbMng(self, "DEL", "Done setting up approximant.", 5) + vbMng(self, "DEL", "Done setting up local approximant.", 5) return False else: Q = PI() - Q.coeffs = np.ones(1, dtype = np.complex) - Q.npar = 1 + Q.coeffs = np.ones((1,) * self.npar, dtype = np.complex) + Q.npar = self.npar Q.polybasis = self.polybasis self.trainedModel.data.Q = copy(Q) try: self.trainedModel.data.P = copy(self._setupNumerator()) except RROMPyException as RE: RROMPyWarning(RE._msg) - vbMng(self, "DEL", "Done setting up approximant.", 5) + vbMng(self, "DEL", "Done setting up local approximant.", 5) return False self.trainedModel.data.approxParameters = copy(self.approxParameters) - vbMng(self, "DEL", "Done setting up approximant.", 5) + vbMng(self, "DEL", "Done setting up local approximant.", 5) + self.verbosity += 10 return True def loadTrainedModel(self, filename:str): """Load trained reduced model from file.""" super().loadTrainedModel(filename) self.sampleBatchIdx, self.sampleBatchSize, _S = -1, 0, 0 nextBatchSize = 1 while _S + nextBatchSize <= self.S + 1: self.sampleBatchIdx += 1 self.sampleBatchSize = nextBatchSize _S += self.sampleBatchSize nextBatchSize = totalDegreeN(self.npar - 1, self.sampleBatchIdx + 1) diff --git a/rrompy/reduction_methods/greedy/reduced_basis_greedy.py b/rrompy/reduction_methods/greedy/reduced_basis_greedy.py index 88830f1..001e35a 100644 --- a/rrompy/reduction_methods/greedy/reduced_basis_greedy.py +++ b/rrompy/reduction_methods/greedy/reduced_basis_greedy.py @@ -1,178 +1,180 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from copy import deepcopy as copy from .generic_greedy_approximant import GenericGreedyApproximant from rrompy.reduction_methods.standard import ReducedBasis from rrompy.utilities.base.types import DictAny, HFEng, paramVal from rrompy.utilities.base import verbosityManager as vbMng from rrompy.utilities.numerical import dot from rrompy.utilities.exception_manager import RROMPyWarning, RROMPyAssert __all__ = ['ReducedBasisGreedy'] class ReducedBasisGreedy(GenericGreedyApproximant, ReducedBasis): """ ROM greedy RB approximant computation for parametric problems. Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': whether to compute POD of snapshots; defaults to True; - 'S': number of starting training points; - 'sampler': sample point generator; - 'greedyTol': uniform error tolerance for greedy algorithm; defaults to 1e-2; - 'collinearityTol': collinearity tolerance for greedy algorithm; defaults to 0.; - 'maxIter': maximum number of greedy steps; defaults to 1e2; - 'refinementRatio': ratio of test points to be exhausted before test set refinement; defaults to 0.2; - 'nTestPoints': number of test points; defaults to 5e2; - 'trainSetGenerator': training sample points generator; defaults to sampler. Defaults to empty dict. approx_state(optional): Whether to approximate state. Defaults and must be True. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. mu0: Default parameter. mus: Array of snapshot parameters. approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList. parameterListSoft: Recognized keys of soft approximant parameters: - 'POD': whether to compute POD of snapshots. - 'greedyTol': uniform error tolerance for greedy algorithm; - 'collinearityTol': collinearity tolerance for greedy algorithm; - 'maxIter': maximum number of greedy steps; - 'refinementRatio': ratio of test points to be exhausted before test set refinement; - 'nTestPoints': number of test points; - 'trainSetGenerator': training sample points generator. parameterListCritical: Recognized keys of critical approximant parameters: - 'S': total number of samples current approximant relies upon; - 'sampler': sample point generator. approx_state: Whether to approximate state. verbosity: Verbosity level. POD: whether to compute POD of snapshots. S: number of test points. sampler: Sample point generator. greedyTol: uniform error tolerance for greedy algorithm. collinearityTol: Collinearity tolerance for greedy algorithm. maxIter: maximum number of greedy steps. refinementRatio: ratio of training points to be exhausted before training set refinement. nTestPoints: number of starting training points. trainSetGenerator: training sample points generator. muBounds: list of bounds for parameter values. samplingEngine: Sampling engine. estimatorNormEngine: Engine for estimator norm computation. uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as sampleList. lastSolvedHF: Parameter(s) corresponding to last computed high fidelity solution(s) as parameterList. uApproxReduced: Reduced approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApproxReduced: Parameter(s) corresponding to last computed reduced approximate solution(s) as parameterList. uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApprox: Parameter(s) corresponding to last computed approximate solution(s) as parameterList. As: List of sparse matrices (in CSC format) representing coefficients of linear system matrix. bs: List of numpy vectors representing coefficients of linear system RHS. ARBs: List of sparse matrices (in CSC format) representing coefficients of compressed linear system matrix. bRBs: List of numpy vectors representing coefficients of compressed linear system RHS. """ def __init__(self, HFEngine:HFEng, mu0 : paramVal = None, approxParameters : DictAny = {}, approx_state : bool = True, verbosity : int = 10, timestamp : bool = True): if not approx_state: RROMPyWarning("Overriding approx_state to True.") self._preInit() self._addParametersToList(toBeExcluded = ["R", "PODTolerance"]) super().__init__(HFEngine = HFEngine, mu0 = mu0, approxParameters = approxParameters, approx_state = True, verbosity = verbosity, timestamp = timestamp) self._postInit() @property def R(self): """Value of R.""" self._R = self._S return self._R @R.setter def R(self, R): RROMPyWarning(("R is used just to simplify inheritance, and its value " "cannot be changed from that of S.")) @property def PODTolerance(self): """Value of PODTolerance.""" self._PODTolerance = -1 return self._PODTolerance @PODTolerance.setter def PODTolerance(self, PODTolerance): RROMPyWarning(("PODTolerance is used just to simplify inheritance, " "and its value cannot be changed from -1.")) def setupApproxLocal(self): """Compute RB projection matrix.""" if self.checkComputedApprox(): return RROMPyAssert(self._mode, message = "Cannot setup approximant.") - vbMng(self, "INIT", "Setting up {}.". format(self.name()), 5) + self.verbosity -= 10 + vbMng(self, "INIT", "Setting up local approximant.", 5) vbMng(self, "INIT", "Computing projection matrix.", 7) pMat = self.samplingEngine.samples.data pMatEff = dot(self.HFEngine.C, pMat) if self.trainedModel is None: self.trainedModel = self.tModelType() self.trainedModel.verbosity = self.verbosity self.trainedModel.timestamp = self.timestamp datadict = {"mu0": self.mu0, "projMat": pMatEff, "scaleFactor": self.scaleFactor, "rescalingExp": self.HFEngine.rescalingExp} data = self.initializeModelData(datadict)[0] ARBs, bRBs = self.assembleReducedSystem(pMat) data.affinePoly = self.HFEngine.affinePoly self.HFEngine.buildA() self.HFEngine.buildb() data.thAs, data.thbs = self.HFEngine.thAs, self.HFEngine.thbs self.trainedModel.data = data else: self.trainedModel = self.trainedModel Sold = self.trainedModel.data.projMat.shape[1] ARBs, bRBs = self.assembleReducedSystem(pMat[:, Sold :], pMat[:, : Sold]) self.trainedModel.data.projMat = copy(pMatEff) self.trainedModel.data.mus = copy(self.mus) self.trainedModel.data.ARBs = ARBs self.trainedModel.data.bRBs = bRBs vbMng(self, "DEL", "Done computing projection matrix.", 7) self.trainedModel.data.approxParameters = copy(self.approxParameters) - vbMng(self, "DEL", "Done setting up approximant.", 5) + vbMng(self, "DEL", "Done setting up local approximant.", 5) + self.verbosity += 10 diff --git a/rrompy/reduction_methods/pivoted/__init__.py b/rrompy/reduction_methods/pivoted/__init__.py index 4ea30f9..09efffa 100644 --- a/rrompy/reduction_methods/pivoted/__init__.py +++ b/rrompy/reduction_methods/pivoted/__init__.py @@ -1,27 +1,29 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from .generic_pivoted_approximant import GenericPivotedApproximant from .rational_interpolant_pivoted import RationalInterpolantPivoted +from .rational_interpolant_greedy_pivoted import RationalInterpolantGreedyPivoted __all__ = [ 'GenericPivotedApproximant', - 'RationalInterpolantPivoted' + 'RationalInterpolantPivoted', + 'RationalInterpolantGreedyPivoted' ] diff --git a/rrompy/reduction_methods/pivoted/generic_pivoted_approximant.py b/rrompy/reduction_methods/pivoted/generic_pivoted_approximant.py index 433ded1..2966787 100644 --- a/rrompy/reduction_methods/pivoted/generic_pivoted_approximant.py +++ b/rrompy/reduction_methods/pivoted/generic_pivoted_approximant.py @@ -1,532 +1,523 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import numpy as np from copy import deepcopy as copy from rrompy.reduction_methods.base.generic_approximant import ( GenericApproximant) from rrompy.utilities.poly_fitting.polynomial import (polybases as ppb, PolynomialInterpolator as PI) from rrompy.utilities.poly_fitting.radial_basis import (polybases as rbpb, RadialBasisInterpolator as RBI) from rrompy.utilities.poly_fitting.moving_least_squares import ( polybases as mlspb, MovingLeastSquaresInterpolator as MLSI) from rrompy.sampling.pivoted import (SamplingEnginePivoted, SamplingEnginePivotedPOD) from rrompy.utilities.base.types import ListAny, DictAny, HFEng, paramVal from rrompy.utilities.base import verbosityManager as vbMng from rrompy.utilities.numerical import (fullDegreeN, totalDegreeN, nextDerivativeIndices) from rrompy.utilities.exception_manager import (RROMPyException, RROMPyAssert, RROMPyWarning) -from rrompy.parameter import emptyParameterList __all__ = ['GenericPivotedApproximant'] class GenericPivotedApproximant(GenericApproximant): """ ROM pivoted approximant (with pole matching) computation for parametric problems (ABSTRACT). Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. directionPivot(optional): Pivot components. Defaults to [0]. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': whether to compute POD of snapshots; defaults to True; - 'matchingWeight': weight for pole matching optimization; defaults to 1; - 'cutOffTolerance': tolerance for ignoring parasitic poles; defaults to np.inf; - 'cutOffType': rule for tolerance computation for parasitic poles; defaults to 'MAGNITUDE'; - 'S': total number of pivot samples current approximant relies upon; - 'samplerPivot': pivot sample point generator; - 'SMarginal': total number of marginal samples current approximant relies upon; - 'samplerMarginal': marginal sample point generator; - 'polybasisMarginal': type of polynomial basis for marginal interpolation; allowed values include 'MONOMIAL', 'CHEBYSHEV' and 'LEGENDRE'; defaults to 'MONOMIAL'; - 'MMarginal': degree of marginal interpolant; defaults to 0; - 'polydegreetypeMarginal': type of polynomial degree for marginal; defaults to 'TOTAL'; - 'radialDirectionalWeightsMarginal': radial basis weights for marginal interpolant; defaults to 0, i.e. identity; - 'nNearestNeighborMarginal': number of marginal nearest neighbors considered if polybasisMarginal allows; defaults to -1; - 'interpRcondMarginal': tolerance for marginal interpolation; defaults to None. Defaults to empty dict. approx_state(optional): Whether to approximate state. Defaults to False. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. mu0: Default parameter. directionPivot: Pivot components. mus: Array of snapshot parameters. - musPivot: Array of pivot snapshot parameters. musMarginal: Array of marginal snapshot parameters. approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList. parameterListSoft: Recognized keys of soft approximant parameters: - 'POD': whether to compute POD of snapshots; - 'matchingWeight': weight for pole matching optimization; - 'cutOffTolerance': tolerance for ignoring parasitic poles; - 'cutOffType': rule for tolerance computation for parasitic poles; - 'polybasisMarginal': type of polynomial basis for marginal interpolation; - 'MMarginal': degree of marginal interpolant; - 'polydegreetypeMarginal': type of polynomial degree for marginal; - 'radialDirectionalWeightsMarginal': radial basis weights for marginal interpolant; - 'nNearestNeighborMarginal': number of marginal nearest neighbors considered if polybasisMarginal allows; - 'interpRcondMarginal': tolerance for marginal interpolation. parameterListCritical: Recognized keys of critical approximant parameters: - 'S': total number of pivot samples current approximant relies upon; - 'samplerPivot': pivot sample point generator; - 'SMarginal': total number of marginal samples current approximant relies upon; - 'samplerMarginal': marginal sample point generator. approx_state: Whether to approximate state. verbosity: Verbosity level. POD: Whether to compute POD of snapshots. matchingWeight: Weight for pole matching optimization. cutOffTolerance: Tolerance for ignoring parasitic poles. cutOffType: Rule for tolerance computation for parasitic poles. S: Total number of pivot samples current approximant relies upon. samplerPivot: Pivot sample point generator. SMarginal: Total number of marginal samples current approximant relies upon. samplerMarginal: Marginal sample point generator. polybasisMarginal: Type of polynomial basis for marginal interpolation. MMarginal: Degree of marginal interpolant. polydegreetypeMarginal: Type of polynomial degree for marginal. radialDirectionalWeightsMarginal: Radial basis weights for marginal interpolant. nNearestNeighborMarginal: Number of marginal nearest neighbors considered if polybasisMarginal allows. interpRcondMarginal: Tolerance for marginal interpolation. muBoundsPivot: list of bounds for pivot parameter values. muBoundsMarginal: list of bounds for marginal parameter values. samplingEngine: Sampling engine. uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as sampleList. lastSolvedHF: Parameter(s) corresponding to last computed high fidelity solution(s) as parameterList. uApproxReduced: Reduced approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApproxReduced: Parameter(s) corresponding to last computed reduced approximate solution(s) as parameterList. uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApprox: Parameter(s) corresponding to last computed approximate solution(s) as parameterList. """ def __init__(self, HFEngine:HFEng, mu0 : paramVal = None, directionPivot : ListAny = [0], approxParameters : DictAny = {}, approx_state : bool = False, verbosity : int = 10, timestamp : bool = True): self._preInit() if len(directionPivot) > 1: raise RROMPyException(("Exactly 1 pivot parameter allowed in pole " "matching.")) from rrompy.parameter.parameter_sampling import QuadratureSampler as QS QSBase = QS([[0], [1]], "UNIFORM") self._addParametersToList(["matchingWeight", "cutOffTolerance", "cutOffType", "polybasisMarginal", "MMarginal", "polydegreetypeMarginal", "radialDirectionalWeightsMarginal", "nNearestNeighborMarginal", "interpRcondMarginal"], [1, np.inf, "MAGNITUDE", "MONOMIAL", 0, "TOTAL", 1, -1, -1], ["samplerPivot", "SMarginal", "samplerMarginal"], [QSBase, [1], QSBase]) del QS self._directionPivot = directionPivot super().__init__(HFEngine = HFEngine, mu0 = mu0, approxParameters = approxParameters, approx_state = approx_state, verbosity = verbosity, timestamp = timestamp) self._postInit() def setupSampling(self): """Setup sampling engine.""" RROMPyAssert(self._mode, message = "Cannot setup sampling engine.") if not hasattr(self, "_POD") or self._POD is None: return if self.POD: SamplingEngine = SamplingEnginePivotedPOD else: SamplingEngine = SamplingEnginePivoted self.samplingEngine = SamplingEngine(self.HFEngine, self.directionPivot, sample_state = self.approx_state, verbosity = self.verbosity) + def initializeModelData(self, datadict): + if "directionPivot" in datadict.keys(): + from rrompy.reduction_methods.trained_model import \ + TrainedModelPivotedData + return (TrainedModelPivotedData(datadict["mu0"], + datadict.pop("projMat"), + datadict["scaleFactor"], + datadict.pop("rescalingExp"), + datadict["directionPivot"]), + ["mu0", "scaleFactor", "directionPivot", "mus"]) + else: + return super().initializeModelData(datadict) + + @property + def npar(self): + """Number of parameters.""" + if hasattr(self, "_temporaryPivot"): return self.nparPivot + return super().npar + @property def mus(self): """Value of mus. Its assignment may reset snapshots.""" return self._mus @mus.setter def mus(self, mus): musOld = copy(self.mus) if hasattr(self, '_mus') else None if (musOld is None or len(mus) != len(musOld) or not mus == musOld): self.resetSamples() self._mus = mus @property def matchingWeight(self): """Value of matchingWeight.""" return self._matchingWeight @matchingWeight.setter def matchingWeight(self, matchingWeight): self._matchingWeight = matchingWeight self._approxParameters["matchingWeight"] = self.matchingWeight @property def cutOffTolerance(self): """Value of cutOffTolerance.""" return self._cutOffTolerance @cutOffTolerance.setter def cutOffTolerance(self, cutOffTolerance): self._cutOffTolerance = cutOffTolerance self._approxParameters["cutOffTolerance"] = self.cutOffTolerance @property def cutOffType(self): """Value of cutOffType.""" return self._cutOffType @cutOffType.setter def cutOffType(self, cutOffType): try: cutOffType = cutOffType.upper().strip().replace(" ","") if cutOffType not in ["MAGNITUDE", "POTENTIAL"]: raise RROMPyException("Prescribed cutOffType not recognized.") self._cutOffType = cutOffType except: RROMPyWarning(("Prescribed cutOffType not recognized. Overriding " "to 'MAGNITUDE'.")) self._cutOffType = "MAGNITUDE" self._approxParameters["cutOffType"] = self.cutOffType @property def SMarginal(self): """Value of SMarginal.""" return self._SMarginal @SMarginal.setter def SMarginal(self, SMarginal): if SMarginal <= 0: raise RROMPyException("SMarginal must be positive.") if hasattr(self, "_SMarginal") and self._SMarginal is not None: Sold = self.SMarginal else: Sold = -1 self._SMarginal = SMarginal self._approxParameters["SMarginal"] = self.SMarginal if Sold != self.SMarginal: self.resetSamples() @property def polybasisMarginal(self): """Value of polybasisMarginal.""" return self._polybasisMarginal @polybasisMarginal.setter def polybasisMarginal(self, polybasisMarginal): try: polybasisMarginal = polybasisMarginal.upper().strip().replace(" ", "") if polybasisMarginal not in ppb + rbpb + mlspb: raise RROMPyException( "Prescribed marginal polybasis not recognized.") self._polybasisMarginal = polybasisMarginal except: RROMPyWarning(("Prescribed marginal polybasis not recognized. " "Overriding to 'MONOMIAL'.")) self._polybasisMarginal = "MONOMIAL" self._approxParameters["polybasisMarginal"] = self.polybasisMarginal @property def MMarginal(self): """Value of MMarginal.""" return self._MMarginal @MMarginal.setter def MMarginal(self, MMarginal): if MMarginal < 0: raise RROMPyException("MMarginal must be non-negative.") self._MMarginal = MMarginal self._approxParameters["MMarginal"] = self.MMarginal @property def polydegreetypeMarginal(self): """Value of polydegreetypeMarginal.""" return self._polydegreetypeMarginal @polydegreetypeMarginal.setter def polydegreetypeMarginal(self, polydegreetypeM): try: polydegreetypeM = polydegreetypeM.upper().strip().replace(" ","") if polydegreetypeM not in ["TOTAL", "FULL"]: raise RROMPyException(("Prescribed polydegreetypeMarginal not " "recognized.")) self._polydegreetypeMarginal = polydegreetypeM except: RROMPyWarning(("Prescribed polydegreetypeMarginal not recognized. " "Overriding to 'TOTAL'.")) self._polydegreetypeMarginal = "TOTAL" self._approxParameters["polydegreetypeMarginal"] = ( self.polydegreetypeMarginal) @property def radialDirectionalWeightsMarginal(self): """Value of radialDirectionalWeightsMarginal.""" return self._radialDirectionalWeightsMarginal @radialDirectionalWeightsMarginal.setter def radialDirectionalWeightsMarginal(self, radialDirWeightsMarginal): self._radialDirectionalWeightsMarginal = radialDirWeightsMarginal self._approxParameters["radialDirectionalWeightsMarginal"] = ( self.radialDirectionalWeightsMarginal) @property def nNearestNeighborMarginal(self): """Value of nNearestNeighborMarginal.""" return self._nNearestNeighborMarginal @nNearestNeighborMarginal.setter def nNearestNeighborMarginal(self, nNearestNeighborMarginal): self._nNearestNeighborMarginal = nNearestNeighborMarginal self._approxParameters["nNearestNeighborMarginal"] = ( self.nNearestNeighborMarginal) @property def interpRcondMarginal(self): """Value of interpRcondMarginal.""" return self._interpRcondMarginal @interpRcondMarginal.setter def interpRcondMarginal(self, interpRcondMarginal): self._interpRcondMarginal = interpRcondMarginal self._approxParameters["interpRcondMarginal"] = ( self.interpRcondMarginal) @property def directionPivot(self): """Value of directionPivot. Its assignment may reset snapshots.""" return self._directionPivot @directionPivot.setter def directionPivot(self, directionPivot): if hasattr(self, '_directionPivot'): directionPivotOld = copy(self.directionPivot) else: directionPivotOld = None if (directionPivotOld is None or len(directionPivot) != len(directionPivotOld) or not directionPivot == directionPivotOld): self.resetSamples() self._directionPivot = directionPivot @property def directionMarginal(self): return [x for x in range(self.HFEngine.npar) \ if x not in self.directionPivot] @property def nparPivot(self): return len(self.directionPivot) @property def nparMarginal(self): return self.npar - self.nparPivot @property def rescalingExpPivot(self): return [self.HFEngine.rescalingExp[x] for x in self.directionPivot] @property def rescalingExpMarginal(self): return [self.HFEngine.rescalingExp[x] for x in self.directionMarginal] @property def muBoundsPivot(self): """Value of muBoundsPivot.""" return self.samplerPivot.lims @property def muBoundsMarginal(self): """Value of muBoundsMarginal.""" return self.samplerMarginal.lims @property def samplerPivot(self): """Value of samplerPivot.""" return self._samplerPivot @samplerPivot.setter def samplerPivot(self, samplerPivot): if 'generatePoints' not in dir(samplerPivot): raise RROMPyException("Pivot sampler type not recognized.") if hasattr(self, '_samplerPivot') and self._samplerPivot is not None: samplerOld = self.samplerPivot self._samplerPivot = samplerPivot self._approxParameters["samplerPivot"] = self.samplerPivot.__str__() if not 'samplerOld' in locals() or samplerOld != self.samplerPivot: self.resetSamples() @property def samplerMarginal(self): """Value of samplerMarginal.""" return self._samplerMarginal @samplerMarginal.setter def samplerMarginal(self, samplerMarginal): if 'generatePoints' not in dir(samplerMarginal): raise RROMPyException("Marginal sampler type not recognized.") if (hasattr(self, '_samplerMarginal') and self._samplerMarginal is not None): samplerOld = self.samplerMarginal self._samplerMarginal = samplerMarginal self._approxParameters["samplerMarginal"] = ( self.samplerMarginal.__str__()) if not 'samplerOld' in locals() or samplerOld != self.samplerMarginal: self.resetSamples() def resetSamples(self): """Reset samples.""" super().resetSamples() self._musMUniqueCN = None self._derMIdxs = None self._reorderM = None def setSamples(self, samplingEngine): """Copy samplingEngine and samples.""" super().setSamples(samplingEngine) self.mus = copy(self.samplingEngine[0].mus) for sEj in self.samplingEngine[1:]: self.mus.append(sEj.mus) - def computeSnapshots(self): - """Compute snapshots of solution map.""" - RROMPyAssert(self._mode, - message = "Cannot start snapshot computation.") - if self.samplingEngine.nsamplesTot != self.S * self.SMarginal: - self.computeScaleFactor() - self.resetSamples() - vbMng(self, "INIT", "Starting computation of snapshots.", 5) - self.musPivot = self.samplerPivot.generatePoints(self.S) - self.musMarginal = self.samplerMarginal.generatePoints( - self.SMarginal) - self.mus = emptyParameterList() - self.mus.reset((self.S * self.SMarginal, self.HFEngine.npar)) - self.samplingEngine.resetHistory(self.SMarginal) - for j, muMarg in enumerate(self.musMarginal): - for k in range(j * self.S, (j + 1) * self.S): - self.mus.data[k, self.directionPivot] = ( - self.musPivot[k - j * self.S].data) - self.mus.data[k, self.directionMarginal] = muMarg.data - self.samplingEngine.iterSample(self.musPivot, self.musMarginal) - if self.POD: - self.samplingEngine.coalesceSamples(self.interpRcondMarginal) - else: - self.samplingEngine.coalesceSamples() - vbMng(self, "DEL", "Done computing snapshots.", 5) - def _setupMarginalInterpolationIndices(self): """Setup parameters for polyvander.""" RROMPyAssert(self._mode, message = "Cannot setup interpolation indices.") if (self._musMUniqueCN is None or len(self._reorderM) != len(self.musMarginal)): self._musMUniqueCN, musMIdxsTo, musMIdxs, musMCount = ( self.trainedModel.centerNormalizeMarginal(self.musMarginal)\ .unique(return_index = True, return_inverse = True, return_counts = True)) self._musMUnique = self.musMarginal[musMIdxsTo] self._derMIdxs = [None] * len(self._musMUniqueCN) self._reorderM = np.empty(len(musMIdxs), dtype = int) filled = 0 for j, cnt in enumerate(musMCount): self._derMIdxs[j] = nextDerivativeIndices([], self.nparMarginal, cnt) jIdx = np.nonzero(musMIdxs == j)[0] self._reorderM[jIdx] = np.arange(filled, filled + cnt) filled += cnt def _setupMarginalInterp(self): """Compute marginal interpolator.""" RROMPyAssert(self._mode, message = "Cannot setup numerator.") vbMng(self, "INIT", "Starting computation of marginal interpolator.", 7) self._setupMarginalInterpolationIndices() if self.polydegreetypeMarginal == "TOTAL": cfun = totalDegreeN else: cfun = fullDegreeN MM = copy(self.MMarginal) while len(self.musMarginal) < cfun(MM, self.nparMarginal): MM -= 1 if MM < self.MMarginal: RROMPyWarning( ("MMarginal too large compared to SMarginal. " "Reducing MMarginal by {}").format(self.MMarginal - MM)) self.MMarginal = MM mI = [] for j in range(len(self.musMarginal)): canonicalj = 1. * (np.arange(len(self.musMarginal)) == j) self._MMarginal = MM while self.MMarginal >= 0: pParRest = [self.verbosity >= 5, self.polydegreetypeMarginal == "TOTAL", {"derIdxs": self._derMIdxs, "reorder": self._reorderM, "scl": np.power(self.scaleFactorMarginal, -1.)}] if self.polybasisMarginal in ppb: p = PI() else: pParRest = ([self.radialDirectionalWeightsMarginal] + pParRest) pParRest[-1]["nNearestNeighbor"] = ( self.nNearestNeighborMarginal) p = RBI() if self.polybasisMarginal in rbpb else MLSI() if self.polybasisMarginal in ppb + rbpb: pParRest += [{"rcond": self.interpRcondMarginal}] wellCond, msg = p.setupByInterpolation(self._musMUniqueCN, canonicalj, self.MMarginal, self.polybasisMarginal, *pParRest) vbMng(self, "MAIN", msg, 5) if wellCond: break RROMPyWarning(("Polyfit is poorly conditioned. Reducing " "MMarginal by 1.")) self.MMarginal = self.MMarginal - 1 mI = mI + [copy(p)] vbMng(self, "DEL", "Done computing marginal interpolator.", 7) return mI def computeScaleFactor(self): """Compute parameter rescaling factor.""" RROMPyAssert(self._mode, message = "Cannot compute rescaling factor.") self.scaleFactorPivot = .5 * np.abs( self.muBoundsPivot[0] ** self.rescalingExpPivot - self.muBoundsPivot[1] ** self.rescalingExpPivot) self.scaleFactorMarginal = .5 * np.abs( self.muBoundsMarginal[0] ** self.rescalingExpMarginal - self.muBoundsMarginal[1] ** self.rescalingExpMarginal) self.scaleFactor = np.empty(self.npar) self.scaleFactor[self.directionPivot] = self.scaleFactorPivot self.scaleFactor[self.directionMarginal] = self.scaleFactorMarginal diff --git a/rrompy/reduction_methods/pivoted/rational_interpolant_greedy_pivoted.py b/rrompy/reduction_methods/pivoted/rational_interpolant_greedy_pivoted.py new file mode 100644 index 0000000..914e0c3 --- /dev/null +++ b/rrompy/reduction_methods/pivoted/rational_interpolant_greedy_pivoted.py @@ -0,0 +1,433 @@ +# Copyright (C) 2018 by the RROMPy authors +# +# This file is part of RROMPy. +# +# RROMPy is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# RROMPy is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with RROMPy. If not, see . +# + +from copy import deepcopy as copy +import numpy as np +from .generic_pivoted_approximant import GenericPivotedApproximant +from rrompy.reduction_methods.greedy.rational_interpolant_greedy import ( + RationalInterpolantGreedy) +from rrompy.reduction_methods.greedy.generic_greedy_approximant import ( + pruneSamples) +from rrompy.utilities.base.types import Np1D, HFEng, DictAny, ListAny, paramVal +from rrompy.utilities.base import verbosityManager as vbMng +from rrompy.utilities.numerical import totalDegreeN, dot +from rrompy.utilities.poly_fitting.polynomial import polyvander as pv +from rrompy.utilities.exception_manager import RROMPyAssert +from rrompy.parameter import emptyParameterList, parameterList + +__all__ = ['RationalInterpolantGreedyPivoted'] + +class RationalInterpolantGreedyPivoted(GenericPivotedApproximant, + RationalInterpolantGreedy): + """ + ROM pivoted rational interpolant (with pole matching) computation for + parametric problems. + + Args: + HFEngine: HF problem solver. + mu0(optional): Default parameter. Defaults to 0. + directionPivot(optional): Pivot components. Defaults to [0]. + approxParameters(optional): Dictionary containing values for main + parameters of approximant. Recognized keys are: + - 'POD': whether to compute POD of snapshots; defaults to True; + - 'matchingWeight': weight for pole matching optimization; defaults + to 1; + - 'cutOffTolerance': tolerance for ignoring parasitic poles; + defaults to np.inf; + - 'cutOffType': rule for tolerance computation for parasitic poles; + defaults to 'MAGNITUDE'; + - 'S': total number of pivot samples current approximant relies + upon; + - 'samplerPivot': pivot sample point generator; + - 'SMarginal': total number of marginal samples current approximant + relies upon; + - 'samplerMarginal': marginal sample point generator; + - 'polybasis': type of polynomial basis for pivot + interpolation; defaults to 'MONOMIAL'; + - 'polybasisMarginal': type of polynomial basis for marginal + interpolation; allowed values include 'MONOMIAL', 'CHEBYSHEV' + and 'LEGENDRE'; defaults to 'MONOMIAL'; + - 'greedyTol': uniform error tolerance for greedy algorithm; + defaults to 1e-2; + - 'collinearityTol': collinearity tolerance for greedy algorithm; + defaults to 0.; + - 'maxIter': maximum number of greedy steps; defaults to 1e2; + - 'refinementRatio': ratio of test points to be exhausted before + test set refinement; defaults to 0.2; + - 'nTestPoints': number of test points; defaults to 5e2; + - 'trainSetGenerator': training sample points generator; defaults + to sampler; + - 'errorEstimatorKind': kind of error estimator; available values + include 'AFFINE', 'DISCREPANCY', 'INTERPOLATORY', + 'LOOK_AHEAD', and 'NONE'; defaults to 'NONE'; + - 'MMarginal': degree of marginal interpolant; defaults to 0; + - 'polydegreetypeMarginal': type of polynomial degree for marginal; + defaults to 'TOTAL'; + - 'radialDirectionalWeightsMarginal': radial basis weights for + marginal interpolant; defaults to 0, i.e. identity; + - 'nNearestNeighborMarginal': number of marginal nearest neighbors + considered if polybasisMarginal allows; defaults to -1; + - 'interpRcond': tolerance for pivot interpolation; defaults to + None; + - 'interpRcondMarginal': tolerance for marginal interpolation; + defaults to None; + - 'robustTol': tolerance for robust rational denominator + management; defaults to 0. + Defaults to empty dict. + approx_state(optional): Whether to approximate state. Defaults to + False. + verbosity(optional): Verbosity level. Defaults to 10. + + Attributes: + HFEngine: HF problem solver. + mu0: Default parameter. + directionPivot: Pivot components. + mus: Array of snapshot parameters. + musPivot: Array of pivot snapshot parameters. + musMarginal: Array of marginal snapshot parameters. + approxParameters: Dictionary containing values for main parameters of + approximant. Recognized keys are in parameterList. + parameterListSoft: Recognized keys of soft approximant parameters: + - 'POD': whether to compute POD of snapshots; + - 'matchingWeight': weight for pole matching optimization; + - 'cutOffTolerance': tolerance for ignoring parasitic poles; + - 'cutOffType': rule for tolerance computation for parasitic poles; + - 'polybasis': type of polynomial basis for pivot + interpolation; + - 'polybasisMarginal': type of polynomial basis for marginal + interpolation; + - 'greedyTol': uniform error tolerance for greedy algorithm; + - 'collinearityTol': collinearity tolerance for greedy algorithm; + - 'maxIter': maximum number of greedy steps; + - 'refinementRatio': ratio of test points to be exhausted before + test set refinement; + - 'nTestPoints': number of test points; + - 'trainSetGenerator': training sample points generator; + - 'errorEstimatorKind': kind of error estimator; + - 'MMarginal': degree of marginal interpolant; + - 'polydegreetypeMarginal': type of polynomial degree for marginal; + - 'radialDirectionalWeights': radial basis weights for pivot + numerator; + - 'radialDirectionalWeightsMarginal': radial basis weights for + marginal interpolant; + - 'nNearestNeighbor': number of pivot nearest neighbors considered + if polybasis allows; + - 'nNearestNeighborMarginal': number of marginal nearest neighbors + considered if polybasisMarginal allows; + - 'interpRcond': tolerance for pivot interpolation; + - 'interpRcondMarginal': tolerance for marginal interpolation; + - 'robustTol': tolerance for robust rational denominator + management; + - 'correctorForce': whether corrector should forcefully delete bad + poles; + - 'correctorTol': tolerance for corrector step; + - 'correctorMaxIter': maximum number of corrector iterations. + parameterListCritical: Recognized keys of critical approximant + parameters: + - 'S': total number of pivot samples current approximant relies + upon; + - 'samplerPivot': pivot sample point generator; + - 'SMarginal': total number of marginal samples current approximant + relies upon; + - 'samplerMarginal': marginal sample point generator. + approx_state: Whether to approximate state. + verbosity: Verbosity level. + POD: Whether to compute POD of snapshots. + matchingWeight: Weight for pole matching optimization. + cutOffTolerance: Tolerance for ignoring parasitic poles. + cutOffType: Rule for tolerance computation for parasitic poles. + S: Total number of pivot samples current approximant relies upon. + samplerPivot: Pivot sample point generator. + SMarginal: Total number of marginal samples current approximant relies + upon. + samplerMarginal: Marginal sample point generator. + polybasis: Type of polynomial basis for pivot interpolation. + polybasisMarginal: Type of polynomial basis for marginal interpolation. + greedyTol: uniform error tolerance for greedy algorithm. + collinearityTol: Collinearity tolerance for greedy algorithm. + maxIter: maximum number of greedy steps. + refinementRatio: ratio of training points to be exhausted before + training set refinement. + nTestPoints: number of starting training points. + trainSetGenerator: training sample points generator. + errorEstimatorKind: kind of error estimator. + MMarginal: Degree of marginal interpolant. + polydegreetypeMarginal: Type of polynomial degree for marginal. + radialDirectionalWeightsMarginal: Radial basis weights for marginal + interpolant. + nNearestNeighborMarginal: Number of marginal nearest neighbors + considered if polybasisMarginal allows. + interpRcond: Tolerance for pivot interpolation. + interpRcondMarginal: Tolerance for marginal interpolation. + robustTol: Tolerance for robust rational denominator management. + muBoundsPivot: list of bounds for pivot parameter values. + muBoundsMarginal: list of bounds for marginal parameter values. + samplingEngine: Sampling engine. + uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as + sampleList. + lastSolvedHF: Parameter(s) corresponding to last computed high fidelity + solution(s) as parameterList. + uApproxReduced: Reduced approximate solution(s) with parameter(s) + lastSolvedApprox as sampleList. + lastSolvedApproxReduced: Parameter(s) corresponding to last computed + reduced approximate solution(s) as parameterList. + uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as + sampleList. + lastSolvedApprox: Parameter(s) corresponding to last computed + approximate solution(s) as parameterList. + Q: Numpy 1D vector containing complex coefficients of approximant + denominator. + P: Numpy 2D vector whose columns are FE dofs of coefficients of + approximant numerator. + """ + + def __init__(self, HFEngine:HFEng, mu0 : paramVal = None, + directionPivot : ListAny = [0], + approxParameters : DictAny = {}, approx_state : bool = False, + verbosity : int = 10, timestamp : bool = True): + self._preInit() + self._addParametersToList(toBeExcluded = ["sampler"]) + super().__init__(HFEngine = HFEngine, mu0 = mu0, + directionPivot = directionPivot, + approxParameters = approxParameters, + approx_state = approx_state, verbosity = verbosity, + timestamp = timestamp) + self._postInit() + + @property + def tModelType(self): + if hasattr(self, "_temporaryPivot"): return super().tModelType + from rrompy.reduction_methods.trained_model import \ + TrainedModelPivotedRational + return TrainedModelPivotedRational + + @property + def polybasis0(self): + if "_" in self.polybasis: + return self.polybasis.split("_")[0] + return self.polybasis + + def _polyvanderAuxiliary(self, mus, deg, *args): + degEff = [0] * self.npar + degEff[self.directionPivot[0]] = deg + return pv(mus, degEff, *args) + + def _marginalizeMiscellanea(self, forward:bool): + if forward: + self._m_mu0 = copy(self.mu0) + self._m_selfmus = copy(self.mus) + self._m_HFErescalingExp = copy(self.HFEngine.rescalingExp) + self._mu0 = parameterList(self.mu0(self.directionPivot)) + self._mus = parameterList(self.mus(self.directionPivot)) + self.HFEngine.rescalingExp = [self.HFEngine.rescalingExp[ + self.directionPivot[0]]] + else: + self._mu0 = self._m_mu0 + self._mus = self._m_selfmus + self.HFEngine.rescalingExp = self._m_HFErescalingExp + del self._m_mu0, self._m_selfmus, self._m_HFErescalingExp + + def _marginalizeTrainedModel(self, forward:bool): + if forward: + del self._temporaryPivot + self.trainedModel.data.mu0 = self.mu0 + self.trainedModel.data.scaleFactor = [1.] * self.npar + self.trainedModel.data.scaleFactor[self.directionPivot[0]] = ( + self.scaleFactor[0]) + self.trainedModel.data.rescalingExp = self.HFEngine.rescalingExp + Qc = np.zeros((len(self.trainedModel.data.Q.coeffs),) * self.npar, + dtype = self.trainedModel.data.Q.coeffs.dtype) + Pc = np.zeros((len(self.trainedModel.data.P.coeffs),) * self.npar + + (self.trainedModel.data.P.coeffs.shape[1],), + dtype = self.trainedModel.data.P.coeffs.dtype) + for j in range(len(self.trainedModel.data.Q.coeffs)): + Qc[(0,) * self.directionPivot[0] + (j,) + + (0,) * (self.npar - self.directionPivot[0] - 1)] = ( + self.trainedModel.data.Q.coeffs[j]) + for j in range(len(self.trainedModel.data.P.coeffs)): + for k in range(self.trainedModel.data.P.coeffs.shape[1]): + Pc[(0,) * self.directionPivot[0] + (j,) + + (0,) * (self.npar - self.directionPivot[0] - 1) + + (k,)] = self.trainedModel.data.P.coeffs[j, k] + self.trainedModel.data.Q.coeffs = Qc + self.trainedModel.data.P.coeffs = Pc + else: + self._temporaryPivot = 1 + self.trainedModel.data.mu0 = parameterList( + self.mu0(self.directionPivot)) + self.trainedModel.data.scaleFactor = self.scaleFactor + self.trainedModel.data.rescalingExp = self.HFEngine.rescalingExp[ + self.directionPivot[0]] + self.trainedModel.data.Q.coeffs = self.trainedModel.data.Q.coeffs[ + (0,) * self.directionPivot[0] + + (slice(None),) + + (0,) * (self.HFEngine.npar - 1 + - self.directionPivot[0])] + self.trainedModel.data.P.coeffs = self.trainedModel.data.P.coeffs[ + (0,) * self.directionPivot[0] + + (slice(None),) + + (0,) * (self.HFEngine.npar - 1 + - self.directionPivot[0])] + self.trainedModel.data.npar = self.npar + self.trainedModel.data.Q.npar = self.npar + self.trainedModel.data.P.npar = self.npar + + def errorEstimator(self, mus:Np1D, return_max : bool = False) -> Np1D: + """Standard residual-based error estimator.""" + self._marginalizeMiscellanea(True) + setupOK = self.setupApproxLocal() + self._marginalizeMiscellanea(False) + if not setupOK: + err = np.empty(len(mus)) + err[:] = np.nan + if not return_max: return err + return err, 0, np.nan + self._marginalizeTrainedModel(True) + errRes = super().errorEstimator(mus, return_max) + self._marginalizeTrainedModel(False) + return errRes + + def _preliminaryTraining(self): + """Initialize starting snapshots of solution map.""" + RROMPyAssert(self._mode, message = "Cannot start greedy algorithm.") + S = self.S + self.sampleBatchIdx, self.sampleBatchSize, self._S = -1, 0, 0 + nextBatchSize = 1 + while self._S + nextBatchSize <= S: + self.sampleBatchIdx += 1 + self.sampleBatchSize = nextBatchSize + self._S += self.sampleBatchSize + nextBatchSize = totalDegreeN(self.npar - 1, + self.sampleBatchIdx + 1) + self.resetSamples() + musPivot = parameterList(self.trainSetGenerator.generatePoints(self.S)[ + list(range(self.S))]) + muTestPivot = self.samplerPivot.generatePoints(self.nTestPoints) + idxPop = pruneSamples(muTestPivot ** self.HFEngine.rescalingExp[ + self.directionPivot[0]], + musPivot ** self.HFEngine.rescalingExp[ + self.directionPivot[0]], + 1e-10 * self.scaleFactor[0]) + self.mus = emptyParameterList() + self.mus.reset((self.S, self.npar + len(self.musMargLoc))) + muTestBase = emptyParameterList() + muTestBase.reset((len(muTestPivot), self.npar + len(self.musMargLoc))) + for k in range(self.S): + self.mus.data[k, self.directionPivot] = musPivot[k].data + self.mus.data[k, self.directionMarginal] = self.musMargLoc.data + for k in range(len(muTestPivot)): + muTestBase.data[k, self.directionPivot] = muTestPivot[k].data + muTestBase.data[k, self.directionMarginal] = self.musMargLoc.data + muTestBase.pop(idxPop) + muTestBase = muTestBase.sort() + muLast = copy(self.mus[-1]) + self.mus.pop() + if len(self.mus) > 0: + vbMng(self, "MAIN", + ("Adding first {} sample point{} at {} to training " + "set.").format(self.S - 1, "" + "s" * (self.S > 2), + self.mus), 2) + self.samplingEngine.iterSample(self.mus) + self._S = len(self.mus) + self._approxParameters["S"] = self.S + self.muTest = emptyParameterList() + self.muTest.reset((len(muTestBase) + 1, self.mus.shape[1])) + self.muTest.data[: -1] = muTestBase.data + self.muTest.data[-1] = muLast.data + + def setupApprox(self, plotEst : bool = False): + """Compute rational interpolant.""" + if self.checkComputedApprox(): + return True + RROMPyAssert(self._mode, message = "Cannot setup approximant.") + vbMng(self, "INIT", "Setting up {}.". format(self.name()), 5) + self.musMarginal = self.samplerMarginal.generatePoints(self.SMarginal) + S0 = copy(self.S) + Qs, Ps = [], [] + self.computeScaleFactor() + self._scaleFactorOldPivot = copy(self.scaleFactor) + self.scaleFactor = self.scaleFactorPivot + nparEff = self.npar + self._temporaryPivot = 1 + samplingEngs = [] + for j in range(len(self.musMarginal)): + self._S = S0 + self.musMargLoc = self.musMarginal[j] + RationalInterpolantGreedy.setupSampling(self) + self.trainedModel = None + self.verbosity -= 5 + super().setupApprox(plotEst) + self.verbosity += 5 + samplingEngs += [copy(self.samplingEngine)] + Qs = Qs + [copy(self.trainedModel.data.Q)] + Ps = Ps + [copy(self.trainedModel.data.P)] + del self.musMargLoc + # other finalization instructions + self.setupSampling() + self.samplingEngine.resetHistory(len(self.musMarginal)) + for j in range(len(self.musMarginal)): + self.samplingEngine.setsample(samplingEngs[j].samples, j, False) + self.samplingEngine.mus[j] = copy(samplingEngs[j].mus) + self.samplingEngine.nsamples[j] = samplingEngs[j].nsamples + self.samplingEngine.postprocessuBulk(j) + if j == 0: + self._mus = parameterList(samplingEngs[j].mus, nparEff) + else: + self._mus.append(samplingEngs[j].mus) + if self.POD: + self.samplingEngine.coalesceSamples(self.interpRcondMarginal) + postR = self.samplingEngine.RPODCoalesced + else: + self.samplingEngine.coalesceSamples() + postR = np.eye(self.samplingEngine.samplesCoalesced.shape[1]) + # update Ps by post-multiplication with suitable matrix + idxCurr = 0 + for j in range(len(self.musMarginal)): + nsj = Ps[j].coeffs.shape[1] + Ps[j].coeffs = dot(Ps[j].coeffs, + postR[:, idxCurr : idxCurr + nsj].T) + idxCurr = idxCurr + nsj + self.scaleFactor = self._scaleFactorOldPivot + del self._scaleFactorOldPivot, self._temporaryPivot + pMat = self.samplingEngine.samplesCoalesced.data + pMatEff = dot(self.HFEngine.C, pMat) + self.trainedModel = self.tModelType() + self.trainedModel.verbosity = self.verbosity + self.trainedModel.timestamp = self.timestamp + datadict = {"mu0": self.mu0, "projMat": pMatEff, + "scaleFactor": self.scaleFactor, + "rescalingExp": self.HFEngine.rescalingExp, + "directionPivot": self.directionPivot} + self.trainedModel.data = self.initializeModelData(datadict)[0] + self.trainedModel.data.mus = copy(self.mus) + self.trainedModel.data.musMarginal = copy(self.musMarginal) + self.trainedModel.data.marginalInterp = self._setupMarginalInterp() + self.trainedModel.data.Qs, self.trainedModel.data.Ps = Qs, Ps + vbMng(self, "INIT", "Matching poles.", 10) + self.trainedModel.initializeFromRational(self.HFEngine, + self.matchingWeight, self.POD, + self.approx_state) + vbMng(self, "DEL", "Done matching poles.", 10) + if not np.isinf(self.cutOffTolerance): + vbMng(self, "INIT", "Recompressing by cut-off.", 10) + msg = self.trainedModel.recompressByCutOff([-1., 1.], + self.cutOffTolerance, + self.cutOffType) + vbMng(self, "DEL", "Done recompressing." + msg, 10) + self.trainedModel.data.approxParameters = copy(self.approxParameters) + vbMng(self, "DEL", "Done setting up approximant.", 5) diff --git a/rrompy/reduction_methods/pivoted/rational_interpolant_pivoted.py b/rrompy/reduction_methods/pivoted/rational_interpolant_pivoted.py index 159bbcd..b52a7a0 100644 --- a/rrompy/reduction_methods/pivoted/rational_interpolant_pivoted.py +++ b/rrompy/reduction_methods/pivoted/rational_interpolant_pivoted.py @@ -1,366 +1,367 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # from copy import deepcopy as copy import numpy as np from .generic_pivoted_approximant import GenericPivotedApproximant from rrompy.reduction_methods.standard.rational_interpolant import ( RationalInterpolant) -from rrompy.utilities.poly_fitting.polynomial import polybases as ppb -from rrompy.utilities.poly_fitting.radial_basis import polybases as rbpb -from rrompy.utilities.poly_fitting.moving_least_squares import ( - polybases as mlspb) from rrompy.utilities.base.types import HFEng, DictAny, ListAny, paramVal from rrompy.utilities.base import verbosityManager as vbMng from rrompy.utilities.numerical import dot, nextDerivativeIndices -from rrompy.utilities.exception_manager import (RROMPyException, RROMPyAssert, - RROMPyWarning) +from rrompy.utilities.exception_manager import RROMPyAssert, RROMPyWarning +from rrompy.parameter import emptyParameterList __all__ = ['RationalInterpolantPivoted'] class RationalInterpolantPivoted(GenericPivotedApproximant, RationalInterpolant): """ ROM pivoted rational interpolant (with pole matching) computation for parametric problems. Args: HFEngine: HF problem solver. mu0(optional): Default parameter. Defaults to 0. directionPivot(optional): Pivot components. Defaults to [0]. approxParameters(optional): Dictionary containing values for main parameters of approximant. Recognized keys are: - 'POD': whether to compute POD of snapshots; defaults to True; - 'matchingWeight': weight for pole matching optimization; defaults to 1; - 'cutOffTolerance': tolerance for ignoring parasitic poles; defaults to np.inf; - 'cutOffType': rule for tolerance computation for parasitic poles; defaults to 'MAGNITUDE'; - 'S': total number of pivot samples current approximant relies upon; - 'samplerPivot': pivot sample point generator; - 'SMarginal': total number of marginal samples current approximant relies upon; - 'samplerMarginal': marginal sample point generator; - 'polybasis': type of polynomial basis for pivot interpolation; defaults to 'MONOMIAL'; - 'polybasisMarginal': type of polynomial basis for marginal interpolation; allowed values include 'MONOMIAL', 'CHEBYSHEV' and 'LEGENDRE'; defaults to 'MONOMIAL'; - 'M': degree of rational interpolant numerator; defaults to 0; - 'N': degree of rational interpolant denominator; defaults to 0; - - 'polydegreetype': type of polynomial degree; defaults to 'TOTAL'; - 'MMarginal': degree of marginal interpolant; defaults to 0; - 'polydegreetypeMarginal': type of polynomial degree for marginal; defaults to 'TOTAL'; - 'radialDirectionalWeights': radial basis weights for pivot numerator; defaults to 0, i.e. identity; - 'radialDirectionalWeightsMarginal': radial basis weights for marginal interpolant; defaults to 0, i.e. identity; - 'nNearestNeighbor': number of pivot nearest neighbors considered if polybasis allows; defaults to -1; - 'nNearestNeighborMarginal': number of marginal nearest neighbors considered if polybasisMarginal allows; defaults to -1; - 'interpRcond': tolerance for pivot interpolation; defaults to None; - 'interpRcondMarginal': tolerance for marginal interpolation; defaults to None; - 'robustTol': tolerance for robust rational denominator management; defaults to 0; - 'correctorForce': whether corrector should forcefully delete bad poles; defaults to False; - 'correctorTol': tolerance for corrector step; defaults to 0., i.e. no bad poles; - 'correctorMaxIter': maximum number of corrector iterations; defaults to 1. Defaults to empty dict. approx_state(optional): Whether to approximate state. Defaults to False. verbosity(optional): Verbosity level. Defaults to 10. Attributes: HFEngine: HF problem solver. mu0: Default parameter. directionPivot: Pivot components. mus: Array of snapshot parameters. musPivot: Array of pivot snapshot parameters. musMarginal: Array of marginal snapshot parameters. approxParameters: Dictionary containing values for main parameters of approximant. Recognized keys are in parameterList. parameterListSoft: Recognized keys of soft approximant parameters: - 'POD': whether to compute POD of snapshots; - 'matchingWeight': weight for pole matching optimization; - 'cutOffTolerance': tolerance for ignoring parasitic poles; - 'cutOffType': rule for tolerance computation for parasitic poles; - 'polybasis': type of polynomial basis for pivot interpolation; - 'polybasisMarginal': type of polynomial basis for marginal interpolation; - 'M': degree of rational interpolant numerator; - 'N': degree of rational interpolant denominator; - - 'polydegreetype': type of polynomial degree; - 'MMarginal': degree of marginal interpolant; - 'polydegreetypeMarginal': type of polynomial degree for marginal; - 'radialDirectionalWeights': radial basis weights for pivot numerator; - 'radialDirectionalWeightsMarginal': radial basis weights for marginal interpolant; - 'nNearestNeighbor': number of pivot nearest neighbors considered if polybasis allows; - 'nNearestNeighborMarginal': number of marginal nearest neighbors considered if polybasisMarginal allows; - 'interpRcond': tolerance for pivot interpolation; - 'interpRcondMarginal': tolerance for marginal interpolation; - 'robustTol': tolerance for robust rational denominator management; - 'correctorForce': whether corrector should forcefully delete bad poles; - 'correctorTol': tolerance for corrector step; - 'correctorMaxIter': maximum number of corrector iterations. parameterListCritical: Recognized keys of critical approximant parameters: - 'S': total number of pivot samples current approximant relies upon; - 'samplerPivot': pivot sample point generator; - 'SMarginal': total number of marginal samples current approximant relies upon; - 'samplerMarginal': marginal sample point generator. approx_state: Whether to approximate state. verbosity: Verbosity level. POD: Whether to compute POD of snapshots. matchingWeight: Weight for pole matching optimization. cutOffTolerance: Tolerance for ignoring parasitic poles. cutOffType: Rule for tolerance computation for parasitic poles. S: Total number of pivot samples current approximant relies upon. - sampler: Pivot sample point generator. + samplerPivot: Pivot sample point generator. + SMarginal: Total number of marginal samples current approximant relies + upon. + samplerMarginal: Marginal sample point generator. polybasis: Type of polynomial basis for pivot interpolation. polybasisMarginal: Type of polynomial basis for marginal interpolation. M: Numerator degree of approximant. N: Denominator degree of approximant. - polydegreetype: Type of polynomial degree. MMarginal: Degree of marginal interpolant. polydegreetypeMarginal: Type of polynomial degree for marginal. radialDirectionalWeights: Radial basis weights for pivot numerator. radialDirectionalWeightsMarginal: Radial basis weights for marginal interpolant. nNearestNeighbor: Number of pivot nearest neighbors considered if polybasis allows. nNearestNeighborMarginal: Number of marginal nearest neighbors considered if polybasisMarginal allows. interpRcond: Tolerance for pivot interpolation. interpRcondMarginal: Tolerance for marginal interpolation. robustTol: Tolerance for robust rational denominator management. correctorForce: Whether corrector should forcefully delete bad poles. correctorTol: Tolerance for corrector step. correctorMaxIter: Maximum number of corrector iterations. muBoundsPivot: list of bounds for pivot parameter values. muBoundsMarginal: list of bounds for marginal parameter values. samplingEngine: Sampling engine. uHF: High fidelity solution(s) with parameter(s) lastSolvedHF as sampleList. lastSolvedHF: Parameter(s) corresponding to last computed high fidelity solution(s) as parameterList. uApproxReduced: Reduced approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApproxReduced: Parameter(s) corresponding to last computed reduced approximate solution(s) as parameterList. uApprox: Approximate solution(s) with parameter(s) lastSolvedApprox as sampleList. lastSolvedApprox: Parameter(s) corresponding to last computed approximate solution(s) as parameterList. Q: Numpy 1D vector containing complex coefficients of approximant denominator. P: Numpy 2D vector whose columns are FE dofs of coefficients of approximant numerator. """ def __init__(self, HFEngine:HFEng, mu0 : paramVal = None, directionPivot : ListAny = [0], approxParameters : DictAny = {}, approx_state : bool = False, verbosity : int = 10, timestamp : bool = True): self._preInit() - self._addParametersToList(toBeExcluded = ["sampler"]) + self._addParametersToList(toBeExcluded = ["polydegreetype", "sampler"]) super().__init__(HFEngine = HFEngine, mu0 = mu0, directionPivot = directionPivot, approxParameters = approxParameters, approx_state = approx_state, verbosity = verbosity, timestamp = timestamp) self._postInit() @property def tModelType(self): from rrompy.reduction_methods.trained_model import \ TrainedModelPivotedRational return TrainedModelPivotedRational - def initializeModelData(self, datadict): - from rrompy.reduction_methods.trained_model import \ - TrainedModelPivotedData - return (TrainedModelPivotedData(datadict["mu0"], - datadict.pop("projMat"), - datadict["scaleFactor"], - datadict.pop("rescalingExp"), - datadict["directionPivot"]), - ["mu0", "scaleFactor", "directionPivot", "mus"]) - @property - def npar(self): - """Number of parameters.""" - if hasattr(self, "_temporaryPivot"): return self.nparPivot - return super().npar - - @property - def polybasis(self): - """Value of polybasis.""" - return self._polybasis - @polybasis.setter - def polybasis(self, polybasis): - try: - polybasis = polybasis.upper().strip().replace(" ","") - if polybasis not in ppb + rbpb + mlspb: - raise RROMPyException( - "Prescribed pivot polybasis not recognized.") - self._polybasis = polybasis - except: - RROMPyWarning(("Prescribed pivot polybasis not recognized. " - "Overriding to 'MONOMIAL'.")) - self._polybasis = "MONOMIAL" - self._approxParameters["polybasis"] = self.polybasis - + def polydegreetype(self): + """Value of polydegreetype.""" + return "TOTAL" + @polydegreetype.setter + def polydegreetype(self, polydegreetype): + RROMPyWarning(("polydegreetype is used just to simplify inheritance, " + "and its value cannot be changed from 'TOTAL'.")) + @property def polybasis0(self): if "_" in self.polybasis: return self.polybasis.split("_")[0] return self.polybasis @property def correctorTol(self): """Value of correctorTol.""" return self._correctorTol @correctorTol.setter def correctorTol(self, correctorTol): if correctorTol < 0. or (correctorTol > 0. and self.nparPivot > 1): RROMPyWarning(("Overriding prescribed corrector tolerance " "to 0.")) correctorTol = 0. self._correctorTol = correctorTol self._approxParameters["correctorTol"] = self.correctorTol @property def correctorMaxIter(self): """Value of correctorMaxIter.""" return self._correctorMaxIter @correctorMaxIter.setter def correctorMaxIter(self, correctorMaxIter): if correctorMaxIter < 1 or (correctorMaxIter > 1 and self.nparPivot > 1): RROMPyWarning(("Overriding prescribed max number of corrector " "iterations to 1.")) correctorMaxIter = 1 self._correctorMaxIter = correctorMaxIter self._approxParameters["correctorMaxIter"] = self.correctorMaxIter def _setupInterpolationIndices(self): """Setup parameters for polyvander.""" RROMPyAssert(self._mode, message = "Cannot setup interpolation indices.") if (self._musUniqueCN is None or len(self._reorder) != len(self.musPivot)): self._musUniqueCN, musIdxsTo, musIdxs, musCount = ( self.trainedModel.centerNormalizePivot(self.musPivot).unique( return_index = True, return_inverse = True, return_counts = True)) self._musUnique = self.musPivot[musIdxsTo] self._derIdxs = [None] * len(self._musUniqueCN) self._reorder = np.empty(len(musIdxs), dtype = int) filled = 0 for j, cnt in enumerate(musCount): self._derIdxs[j] = nextDerivativeIndices([], self.nparPivot, cnt) jIdx = np.nonzero(musIdxs == j)[0] self._reorder[jIdx] = np.arange(filled, filled + cnt) filled += cnt + def computeSnapshots(self): + """Compute snapshots of solution map.""" + RROMPyAssert(self._mode, + message = "Cannot start snapshot computation.") + if self.samplingEngine.nsamplesTot != self.S * self.SMarginal: + self.computeScaleFactor() + self.resetSamples() + vbMng(self, "INIT", "Starting computation of snapshots.", 5) + self.musPivot = self.samplerPivot.generatePoints(self.S) + self.musMarginal = self.samplerMarginal.generatePoints( + self.SMarginal) + self.mus = emptyParameterList() + self.mus.reset((self.S * self.SMarginal, self.HFEngine.npar)) + self.samplingEngine.resetHistory(self.SMarginal) + for j, muMarg in enumerate(self.musMarginal): + for k in range(j * self.S, (j + 1) * self.S): + self.mus.data[k, self.directionPivot] = ( + self.musPivot[k - j * self.S].data) + self.mus.data[k, self.directionMarginal] = muMarg.data + self.samplingEngine.iterSample(self.musPivot, self.musMarginal) + if self.POD: + self.samplingEngine.coalesceSamples(self.interpRcondMarginal) + else: + self.samplingEngine.coalesceSamples() + vbMng(self, "DEL", "Done computing snapshots.", 5) + def setupApprox(self): """ Compute rational interpolant. SVD-based robust eigenvalue management. """ if self.checkComputedApprox(): return RROMPyAssert(self._mode, message = "Cannot setup approximant.") vbMng(self, "INIT", "Setting up {}.". format(self.name()), 5) self.computeSnapshots() pMat = self.samplingEngine.samplesCoalesced.data pMatEff = dot(self.HFEngine.C, pMat) if self.approx_state else pMat if self.trainedModel is None: self.trainedModel = self.tModelType() self.trainedModel.verbosity = self.verbosity self.trainedModel.timestamp = self.timestamp datadict = {"mu0": self.mu0, "projMat": pMatEff, "scaleFactor": self.scaleFactor, "rescalingExp": self.HFEngine.rescalingExp, "directionPivot": self.directionPivot} self.trainedModel.data = self.initializeModelData(datadict)[0] else: self.trainedModel = self.trainedModel self.trainedModel.data.projMat = copy(pMatEff) self.trainedModel.data.musPivot = copy(self.musPivot) self.trainedModel.data.musMarginal = copy(self.musMarginal) self.trainedModel.data.marginalInterp = self._setupMarginalInterp() N0 = copy(self.N) Qs, Ps = [], [] self._temporaryPivot = 1 if self.POD: self._RPODOldPivot = copy(self.samplingEngine.RPODCoalesced) else: self._samplesOldPivot = copy(self.samplingEngine.samples) self._scaleFactorOldPivot = copy(self.scaleFactor) self.scaleFactor = self.scaleFactorPivot for j in range(len(self.musMarginal)): self._N = N0 if self.POD: self.samplingEngine.RPOD = ( self._RPODOldPivot[:, j * self.S : (j + 1) * self.S]) else: self.samplingEngine.samples = self._samplesOldPivot[j] + self.verbosity -= 5 self._iterCorrector() + self.verbosity += 5 Qs = Qs + [copy(self.trainedModel.data.Q)] Ps = Ps + [copy(self.trainedModel.data.P)] del self.trainedModel.data.Q, self.trainedModel.data.P if self.POD: - self.samplingEngine.RPODCoalesced = self._RPODOldPivot + self.samplingEngine.RPODCoalesced = copy(self._RPODOldPivot) + del self._RPODOldPivot else: - self.samplingEngine.samples = self._samplesOldPivot + self.samplingEngine.samples = copy(self._samplesOldPivot) + del self._samplesOldPivot self.scaleFactor = self._scaleFactorOldPivot - del self._temporaryPivot + del self._temporaryPivot, self._scaleFactorOldPivot self.trainedModel.data.Qs, self.trainedModel.data.Ps = Qs, Ps vbMng(self, "INIT", "Matching poles.", 10) self.trainedModel.initializeFromRational(self.HFEngine, self.matchingWeight, self.POD, self.approx_state) vbMng(self, "DEL", "Done matching poles.", 10) if not np.isinf(self.cutOffTolerance): vbMng(self, "INIT", "Recompressing by cut-off.", 10) msg = self.trainedModel.recompressByCutOff([-1., 1.], self.cutOffTolerance, self.cutOffType) vbMng(self, "DEL", "Done recompressing." + msg, 10) self.trainedModel.data.approxParameters = copy(self.approxParameters) vbMng(self, "DEL", "Done setting up approximant.", 5) diff --git a/rrompy/reduction_methods/trained_model/trained_model_pivoted_general.py b/rrompy/reduction_methods/trained_model/trained_model_pivoted_general.py index 4aeb57b..57c0849 100644 --- a/rrompy/reduction_methods/trained_model/trained_model_pivoted_general.py +++ b/rrompy/reduction_methods/trained_model/trained_model_pivoted_general.py @@ -1,386 +1,386 @@ # Copyright (C) 2018 by the RROMPy authors # # This file is part of RROMPy. # # RROMPy is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RROMPy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with RROMPy. If not, see . # import numpy as np from scipy.special import factorial as fact from itertools import combinations from .trained_model import TrainedModel from rrompy.utilities.base.types import (Np1D, Tuple, List, ListAny, paramVal, paramList, sampList, HFEng) from rrompy.utilities.base import verbosityManager as vbMng, freepar as fp from rrompy.utilities.numerical import pointMatching, chordalMetricTable from rrompy.utilities.poly_fitting.heaviside import HeavisideInterpolator as HI from rrompy.utilities.exception_manager import RROMPyException, RROMPyAssert from rrompy.parameter import checkParameter, checkParameterList from rrompy.sampling import emptySampleList, sampleList __all__ = ['TrainedModelPivotedGeneral'] class TrainedModelPivotedGeneral(TrainedModel): """ ROM approximant evaluation for pivoted approximants (with pole matching). Attributes: Data: dictionary with all that can be pickled. """ def centerNormalizePivot(self, mu : paramList = [], mu0 : paramVal = None) -> paramList: """ Compute normalized parameter to be plugged into approximant. Args: mu: Parameter(s) 1. mu0: Parameter(s) 2. If None, set to self.data.mu0Pivot. Returns: Normalized parameter. """ mu = checkParameterList(mu, self.data.nparPivot)[0] if mu0 is None: mu0 = self.data.mu0Pivot rad = ((mu ** self.data.rescalingExpPivot - mu0 ** self.data.rescalingExpPivot) / self.data.scaleFactorPivot) return rad def centerNormalizeMarginal(self, mu : paramList = [], mu0 : paramVal = None) -> paramList: """ Compute normalized parameter to be plugged into approximant. Args: mu: Parameter(s) 1. mu0: Parameter(s) 2. If None, set to self.data.mu0Marginal. Returns: Normalized parameter. """ mu = checkParameterList(mu, self.data.nparMarginal)[0] if mu0 is None: mu0 = self.data.mu0Marginal rad = ((mu ** self.data.rescalingExpMarginal - mu0 ** self.data.rescalingExpMarginal) / self.data.scaleFactorMarginal) return rad def initializeFromLists(self, poles:ListAny, coeffs:ListAny, basis:str, HFEngine:HFEng, matchingWeight : float = 1., POD : bool = True, is_state : bool = True): """Initialize Heaviside representation.""" musM = self.data.musMarginal margAbsDist = np.sum(np.abs(np.repeat(musM.data, len(musM), 0) - np.tile(musM.data, [len(musM), 1]) ), axis = 1).reshape(len(musM), len(musM)) N = len(poles[0]) explored = [0] unexplored = list(range(1, len(musM))) badPoles = list(np.where(np.isinf(poles[0]))[0]) for _ in range(1, len(musM)): minIdx = np.argmin(np.concatenate([margAbsDist[ex, unexplored] \ for ex in explored])) minIex = explored[minIdx // len(unexplored)] minIunex = unexplored[minIdx % len(unexplored)] dist = chordalMetricTable(poles[minIex], poles[minIunex]) yInf = np.where(np.isinf(poles[minIunex]))[0] - badPoles += list(yInf) if matchingWeight != 0: resex = coeffs[minIex][: N] resunex = coeffs[minIunex][: N] xInf = np.where(np.isinf(poles[minIex]))[0] if POD: distR = resunex.dot(resex.T.conj()) normex = np.linalg.norm(resex, axis = 1) normunex = np.linalg.norm(resunex, axis = 1) else: resex = self.data.projMat.dot(resex.T) resunex = self.data.projMat.dot(resunex.T) distR = HFEngine.innerProduct(resex, resunex, is_state = is_state) normex = HFEngine.norm(resex, is_state = is_state) normunex = HFEngine.norm(resunex, is_state = is_state) normex[xInf], normunex[yInf] = 1., 1. distR = (distR / normex).T / normunex distR = np.abs(distR) distR[distR > 1.] = 1. distR[xInf], distR[:, yInf] = 0., 0. dist += 2. / np.pi * matchingWeight * np.arccos(distR) reordering = pointMatching(dist) poles[minIunex] = poles[minIunex][reordering] coeffs[minIunex][: N] = coeffs[minIunex][reordering] explored += [minIunex] unexplored.remove(minIunex) + badPoles += list(np.where(np.isinf(poles[minIunex]))[0]) goodPoles = [i for i in range(N) if i not in badPoles] goodPolesExt = goodPoles + list(range(N, len(coeffs[0]))) HIs = [] for pls, cfs in zip(poles, coeffs): hsi = HI() hsi.poles = pls[goodPoles] hsi.coeffs = cfs[goodPolesExt] hsi.npar = 1 hsi.polybasis = basis HIs += [hsi] self.data.HIs = HIs def recompressByCutOff(self, murange : Tuple[float, float] = [- 1., 1.], tol : float = np.inf, rtype : str = "MAGNITUDE"): if np.isinf(tol): return " No poles erased." N = len(self.data.HIs[0].poles) mu0 = np.mean(murange) musig = murange[0] - mu0 if np.isclose(musig, 0.): radius = lambda x: np.abs(x - mu0) else: if rtype == "MAGNITUDE": murdir = (murange[0] - mu0) / np.abs(musig) def radius(x): scalprod = (x - mu0) * murdir.conj() / np.abs(musig) rescalepar = np.abs(np.real(scalprod)) rescaleort = np.abs(np.imag(scalprod)) return ((rescalepar - 1.) ** 2. * (rescalepar > 1.) + rescaleort ** 2.) ** .5 else:#if rtype == "POTENTIAL": def radius(x): rescale = (x - mu0) / musig return np.max(np.abs(rescale * np.array([-1., 1.]) + (rescale ** 2. - 1) ** .5)) - 1. keepPole, removePole = [], [] for j in range(N): for hi in self.data.HIs: if radius(hi.poles[j]) <= tol: keepPole += [j] break if len(keepPole) == 0 or keepPole[-1] != j: removePole += [j] if len(keepPole) == N: return " No poles erased." keepCoeff = keepPole + [N] keepCoeff = keepCoeff + list(range(N + 1,len(self.data.HIs[0].coeffs))) for hi in self.data.HIs: polyCorrection = np.zeros_like(hi.coeffs[0, :]) for j in removePole: polyCorrection += hi.coeffs[j, :] / (mu0 - hi.poles[j]) if len(hi.coeffs) == N: hi.coeffs = np.vstack((hi.coeffs, polyCorrection)) else: hi.coeffs[N, :] += polyCorrection hi.poles = hi.poles[keepPole] hi.coeffs = hi.coeffs[keepCoeff, :] return (" Erased {} pole".format(len(removePole)) + "s" * (len(removePole) > 1) + ".") def interpolateMarginal(self, mu : paramList = [], samples : ListAny = [], der : List[int] = None, scl : Np1D = None) -> sampList: """ Evaluate marginal interpolator at arbitrary marginal parameter. Args: mu: Target parameter. samples: Objects to interpolate. der(optional): Derivatives to take before evaluation. """ mu = checkParameterList(mu, self.data.nparMarginal)[0] sList = isinstance(samples[0], sampleList) sEff = [None] * len(samples) for j in range(len(samples)): if sList: sEff[j] = samples[j].data else: sEff[j] = samples[j] try: dtype = sEff[0].dtype except: dtype = sEff[0][0].dtype vbMng(self, "INIT", "Interpolating marginal at mu = {}.".format(mu), 95) muC = self.centerNormalizeMarginal(mu) p = emptySampleList() p.reset((len(sEff[0]), len(muC)), dtype = dtype) p.data[:] = 0. if len(sEff[0]) > 0: for mIj, spj in zip(self.data.marginalInterp, sEff): p = p + spj.reshape(len(sEff[0]), - 1) * mIj(muC, der, scl) vbMng(self, "DEL", "Done interpolating marginal.", 95) if not sList: p = p.data.flatten() return p def interpolateMarginalInterpolator(self, mu : paramVal = []) -> Np1D: """Obtain interpolated approximant interpolator.""" mu = checkParameter(mu, self.data.nparMarginal)[0] hsi = HI() hsi.poles = self.interpolateMarginalPoles(mu) hsi.coeffs = self.interpolateMarginalCoeffs(mu) hsi.npar = 1 hsi.polybasis = self.data.HIs[0].polybasis return hsi def interpolateMarginalPoles(self, mu : paramList = []) -> Np1D: """Obtain interpolated approximant poles.""" mu = checkParameterList(mu, self.data.nparMarginal)[0] return self.interpolateMarginal(mu, [hi.poles for hi in self.data.HIs]) def interpolateMarginalCoeffs(self, mu : paramList = []) -> Np1D: """Obtain interpolated approximant coefficients.""" mu = checkParameterList(mu, self.data.nparMarginal)[0] cs = self.interpolateMarginal(mu, [hi.coeffs for hi in self.data.HIs]) if isinstance(cs, (list, tuple,)): cs = np.array(cs) return cs.reshape(self.data.HIs[0].coeffs.shape) def getApproxReduced(self, mu : paramList = []) -> sampList: """ Evaluate reduced representation of approximant at arbitrary parameter. Args: mu: Target parameter. """ RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters") mu = checkParameterList(mu, self.data.npar)[0] if (not hasattr(self, "lastSolvedApproxReduced") or self.lastSolvedApproxReduced != mu): vbMng(self, "INIT", "Evaluating approximant at mu = {}.".format(mu), 12) self.uApproxReduced = emptySampleList() for i, muPL in enumerate(mu): muL = self.centerNormalizePivot([muPL(0, x) \ for x in self.data.directionPivot]) muM = [muPL(0, x) for x in self.data.directionMarginal] vbMng(self, "INIT", "Assembling reduced model for mu = {}.".format(muPL), 87) hsL = self.interpolateMarginalInterpolator(muM) vbMng(self, "DEL", "Done assembling reduced model.", 87) uAppR = hsL(muL) if i == 0: #self.data.HIs[0].coeffs.shape[1], len(mu) self.uApproxReduced.reset((len(uAppR), len(mu)), dtype = uAppR.dtype) self.uApproxReduced[i] = uAppR vbMng(self, "DEL", "Done evaluating approximant.", 12) self.lastSolvedApproxReduced = mu return self.uApproxReduced def getPVal(self, mu : paramList = []) -> sampList: """ Evaluate rational numerator at arbitrary parameter. Args: mu: Target parameter. """ RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters") mu = checkParameterList(mu, self.data.npar)[0] p = emptySampleList() p.reset((len(self.data.HIs[0].coeffs.shape[1]), len(mu))) for i, muPL in enumerate(mu): muL = self.centerNormalizePivot([muPL(0, x) \ for x in self.data.directionPivot]) muM = [muPL(0, x) for x in self.data.directionMarginal] hsL = self.interpolateMarginalInterpolator(muM) p[i] = hsL(muL) * np.prod(muL(0, 0) - hsL.poles) return p def getQVal(self, mu:Np1D, der : List[int] = None, scl : Np1D = None) -> Np1D: """ Evaluate rational denominator at arbitrary parameter. Args: mu: Target parameter. der(optional): Derivatives to take before evaluation. """ RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters") mu = checkParameterList(mu, self.data.npar)[0] muP = self.centerNormalizePivot(checkParameterList( mu.data[:, self.data.directionPivot], self.data.nparPivot)[0]) muM = checkParameterList(mu.data[:, self.data.directionMarginal], self.data.nparMarginal)[0] if der is None: derP, derM = 0, [0] else: derP = der[self.data.directionPivot[0]] derM = [der[x] for x in self.data.directionMarginal] if np.any(np.array(derM) != 0): raise RROMPyException(("Derivatives of Q with respect to marginal " "parameters not allowed.")) sclP = 1 if scl is None else scl[self.data.directionPivot[0]] derVal = np.zeros(len(mu), dtype = np.complex) N = len(self.data.HIs[0].poles) if derP == N: derVal[:] = 1. elif derP >= 0 and derP < N: pls = self.interpolateMarginalPoles(muM).reshape(-1, len(mu)).T plsDist = muP.data.reshape(-1, 1) - pls for terms in combinations(np.arange(N), N - derP): derVal += np.prod(plsDist[:, list(terms)], axis = 1) return sclP ** derP * fact(derP) * derVal def getPoles(self, *args, **kwargs) -> Np1D: """ Obtain approximant poles. Returns: Numpy complex vector of poles. """ RROMPyAssert(self.data.nparPivot, 1, "Number of pivot parameters") if len(args) + len(kwargs) > 1: raise RROMPyException(("Wrong number of parameters passed. " "Only 1 available.")) elif len(args) + len(kwargs) == 1: if len(args) == 1: mVals = args[0] else: mVals = kwargs["marginalVals"] if not hasattr(mVals, "__len__"): mVals = [mVals] mVals = list(mVals) else: mVals = [fp] try: rDim = mVals.index(fp) if rDim < len(mVals) - 1 and fp in mVals[rDim + 1 :]: raise except: raise RROMPyException(("Exactly 1 'freepar' entry in " "marginalVals must be provided.")) if rDim != self.data.directionPivot[0]: raise RROMPyException(("'freepar' entry in marginalVals must " "coincide with pivot direction.")) mVals[rDim] = self.data.mu0(rDim) mMarg = [mVals[j] for j in range(len(mVals)) if j != rDim] roots = np.sort(np.array(self.interpolateMarginalPoles(mMarg))) return np.power(self.data.mu0(rDim) ** self.data.rescalingExp[rDim] + self.data.scaleFactor[rDim] * roots, 1. / self.data.rescalingExp[rDim]) def getResidues(self, *args, **kwargs) -> Np1D: """ Obtain approximant residues. Returns: Numpy matrix with residues as columns. """ pls = self.getPoles(*args, **kwargs) if len(args) == 1: mVals = args[0] elif len(args) == 0: mVals = [None] else: mVals = kwargs["marginalVals"] if not hasattr(mVals, "__len__"): mVals = [mVals] mVals = list(mVals) rDim = mVals.index(fp) mMarg = [mVals[j] for j in range(len(mVals)) if j != rDim] residues = self.interpolateMarginalCoeffs(mMarg)[: len(pls)] res = self.data.projMat.dot(residues.T) return pls, res