Page MenuHomec4science

hfengine_base.py
No OneTemporary

File Metadata

Created
Sun, May 12, 12:59

hfengine_base.py

# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see <http://www.gnu.org/licenses/>.
#
from abc import abstractmethod
import numpy as np
from numbers import Number
from copy import copy as softcopy
from rrompy.utilities.base.types import (Np1D, Np2D, ScOp, List, DictAny,
paramVal, paramList, sampList)
from rrompy.utilities.numerical import (solve as tsolve, dot, customPInv,
rayleighQuotientIteration)
from rrompy.utilities.exception_manager import RROMPyAssert
from rrompy.parameter import checkParameter, checkParameterList
from rrompy.sampling import sampleList, emptySampleList
from rrompy.solver import setupSolver
__all__ = ['HFEngineBase']
class HFEngineBase:
"""Generic solver for parametric problems."""
def __init__(self, verbosity : int = 10, timestamp : bool = True):
self.verbosity = verbosity
self.timestamp = timestamp
self.setSolver("SPSOLVE", {"use_umfpack" : False})
self.npar = 0
self._C = None
self.outputNormMatrix = 1.
def name(self) -> str:
return self.__class__.__name__
def __str__(self) -> str:
return self.name()
def __repr__(self) -> str:
return self.__str__() + " at " + hex(id(self))
def __dir_base__(self):
return [x for x in self.__dir__() if x[:2] != "__"]
def __deepcopy__(self, memo):
return softcopy(self)
@property
def npar(self):
"""Value of npar."""
return self._npar
@npar.setter
def npar(self, npar):
nparOld = self._npar if hasattr(self, "_npar") else -1
if npar != nparOld:
self.rescalingExp = [1.] * npar
self._npar = npar
@property
@abstractmethod
def spacedim(self):
return 1
def checkParameter(self, mu:paramVal):
return checkParameter(mu, self.npar)
def checkParameterList(self, mu:paramList):
return checkParameterList(mu, self.npar)
def buildEnergyNormForm(self):
"""
Build sparse matrix (in CSR format) representative of scalar product.
"""
self.energyNormMatrix = 1.
def buildEnergyNormDualForm(self):
"""
Build sparse matrix (in CSR format) representative of dual scalar
product.
"""
self.energyNormDualMatrix = 1.
def buildEnergyNormPartialDualForm(self):
"""
Build sparse matrix (in CSR format) representative of dual scalar
product without duality.
"""
self.energyNormPartialDualMatrix = 1.
def innerProduct(self, u:Np2D, v:Np2D, onlyDiag : bool = False,
dual : bool = False, is_state : bool = True) -> Np2D:
"""Scalar product."""
if is_state or self.isCEye:
if dual:
if not hasattr(self, "energyNormPartialDualMatrix"):
self.buildEnergyNormPartialDualForm()
energyMat = self.energyNormPartialDualMatrix
else:
if not hasattr(self, "energyNormMatrix"):
self.buildEnergyNormForm()
energyMat = self.energyNormMatrix
else:
energyMat = self.outputNormMatrix
if not isinstance(u, (np.ndarray,)): u = u.data
if not isinstance(v, (np.ndarray,)): v = v.data
if onlyDiag:
return np.sum(dot(energyMat, u) * v.conj(), axis = 0)
return dot(dot(energyMat, u).T, v.conj()).T
def norm(self, u:Np2D, dual : bool = False,
is_state : bool = True) -> Np1D:
return np.abs(self.innerProduct(u, u, onlyDiag = True, dual = dual,
is_state = is_state)) ** .5
@abstractmethod
def A(self, mu : paramVal = [], der : List[int] = 0) -> ScOp:
"""
Assemble terms of operator of linear system and return it (or its
derivative) at a given parameter.
"""
return
@abstractmethod
def b(self, mu : paramVal = [], der : List[int] = 0) -> Np1D:
"""
Assemble terms of RHS of linear system and return it (or its
derivative) at a given parameter.
"""
return
@property
def C(self):
"""Value of C."""
if self._C is None: self._C = 1.
return self._C
@property
def isCEye(self):
return isinstance(self.C, Number)
def applyC(self, u:sampList):
"""Apply LHS of linear system."""
return dot(self.C, u)
def applyCpInv(self, u:sampList):
"""Apply pseudoinverse of LHS of linear system."""
return dot(customPInv(self.C), u)
_isStateShiftZero = True
def stateShift(self, mu : paramVal = []) -> Np1D:
return np.zeros((self.spacedim, len(mu)))
_isOutputShiftZero = True
def outputShift(self, mu : paramVal = []) -> Np1D:
return self.applyC(self.stateShift(mu))
def setSolver(self, solverType:str, solverArgs : DictAny = {}):
"""Choose solver type and parameters."""
self._solver, self._solverArgs = setupSolver(solverType, solverArgs)
def solve(self, mu : paramList = [], RHS : sampList = None,
return_state : bool = False, verbose : bool = False) -> sampList:
"""
Find solution of linear system.
Args:
mu: parameter value.
RHS: RHS of linear system. If None, defaults to that of parametric
system. Defaults to None.
return_state: whether to return state before multiplication by c.
Defaults to False.
verbose: whether to notify for each solution computed. Defaults to
False.
"""
if mu == []: mu = self.mu0
mu = self.checkParameterList(mu)[0]
if self.npar == 0: mu.reset((1, self.npar), mu.dtype)
if len(mu) == 0: return emptySampleList()
if RHS is None: RHS = [self.b(m) for m in mu]
RHS = sampleList(RHS)
mult = 0 if len(RHS) == 1 else 1
RROMPyAssert(mult * (len(mu) - 1) + 1, len(RHS), "Sample size")
for j in range(len(mu)):
u = tsolve(self.A(mu[j]), RHS[mult * j], self._solver,
self._solverArgs)
if return_state:
if j == 0:
sol = emptySampleList()
sol.reset((len(u), len(mu)), dtype = u.dtype)
sol[j] = u
else:
if j == 0: sol = np.empty((len(u), len(mu)), dtype = u.dtype)
sol[:, j] = u
if verbose:
print("." * (j % 5 != 4) + "*" * (j % 5 == 4), end = "")
if not return_state:
sol = sampleList(self.applyC(sol) - self.outputShift(mu))
else:
sol = sampleList(sol - self.stateShift(mu))
if verbose: print()
return sol
def residual(self, mu : paramList = [], u : sampList = None,
post_c : bool = True) -> sampList:
"""
Find residual of linear system for given approximate solution.
Args:
mu: parameter value.
u: numpy complex array with function dofs. If None, set to 0.
post_c: whether to post-process using c. Defaults to True.
"""
if mu == []: mu = self.mu0
mu = self.checkParameterList(mu)[0]
if self.npar == 0: mu.reset((1, self.npar), mu.dtype)
if len(mu) == 0: return emptySampleList()
v = sampleList(self.stateShift(mu))
if u is not None: v = v + sampleList(u)
for j in range(len(mu)):
r = self.b(mu[j]) - dot(self.A(mu[j]), v[j])
if post_c:
if j == 0: res = np.empty((len(r), len(mu)), dtype = r.dtype)
res[:, j] = r
else:
if j == 0:
res = emptySampleList()
res.reset((len(r), len(mu)), dtype = r.dtype)
res[j] = r
if post_c: res = sampleList(self.applyC(res))
return res
def stabilityFactor(self, mu : paramList = [], u : sampList = None,
nIterP : int = 10, nIterR : int = 10) -> sampList:
"""
Find stability factor of matrix of linear system using iterative
inverse power iteration- and Rayleigh quotient-based procedure.
Args:
mu: parameter values.
u: numpy complex arrays with function dofs.
nIterP: number of iterations of power method.
nIterR: number of iterations of Rayleigh quotient method.
"""
if mu == []: mu = self.mu0
mu = self.checkParameterList(mu)[0]
if self.npar == 0: mu.reset((1, self.npar), mu.dtype)
u = sampleList(u)
solShift = self.stateShift(mu)
if len(u) == len(mu):
u = u + solShift
else:
u = sampleList(solShift) + np.tile(u.data, (1, len(mu)))
stabFact = np.empty(len(mu), dtype = float)
if not hasattr(self, "energyNormMatrix"):
self.buildEnergyNormForm()
for j in range(len(mu)):
stabFact[j] = rayleighQuotientIteration(self.A(mu[j]), u[j],
self.energyNormMatrix,
0., nIterP, nIterR,
self._solver,
self._solverArgs)
return stabFact

Event Timeline