Page MenuHomec4science

hfengine_base.py
No OneTemporary

File Metadata

Created
Fri, May 10, 22:03

hfengine_base.py

# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see <http://www.gnu.org/licenses/>.
#
from abc import abstractmethod
import numpy as np
import scipy.sparse as scsp
from numbers import Number
from copy import copy as softcopy, deepcopy as copy
from rrompy.utilities.base.decorators import nonaffine_construct
from rrompy.utilities.base.types import (Np1D, Np2D, List, DictAny, paramVal,
paramList, sampList)
from rrompy.utilities.numerical import solve as tsolve, dot, customPInv
from rrompy.utilities.numerical.rayleigh_quotient_iteration import (
rayleighQuotientIteration)
from rrompy.utilities.exception_manager import RROMPyAssert
from rrompy.parameter import checkParameter, checkParameterList
from rrompy.solver.linear_solver import setupSolver
try:
from mpi4py import MPI
_MPI_IS_LOADED = True
from rrompy.utilities.base.partitioning import partition
except ImportError:
_MPI_IS_LOADED = False
__all__ = ['HFEngineBase']
class HFEngineBase:
"""Generic solver for parametric problems."""
_forceSerial = False
def __init__(self, verbosity : int = 10, timestamp : bool = True):
self.verbosity = verbosity
self.timestamp = timestamp
self.setSolver("SPSOLVE", {"use_umfpack" : False})
self.npar = 0
self._C = None
self.outputNormMatrix = 1.
def name(self) -> str:
return self.__class__.__name__
def __str__(self) -> str:
return self.name()
def __repr__(self) -> str:
return self.__str__() + " at " + hex(id(self))
def __dir_base__(self):
return [x for x in self.__dir__() if x[:2] != "__"]
def __deepcopy__(self, memo):
return softcopy(self)
@property
def npar(self):
"""Value of npar."""
return self._npar
@npar.setter
def npar(self, npar):
nparOld = self._npar if hasattr(self, "_npar") else -1
if npar != nparOld:
self.rescalingExp = [1.] * npar
self._npar = npar
@property
def spacedim(self):
return 1
def checkParameter(self, mu:paramVal):
muP = checkParameter(mu, self.npar)
if self.npar == 0: muP.reset((1, 0), muP.dtype)
return muP
def checkParameterList(self, mu:paramList):
muL = checkParameterList(mu, self.npar)
if self.npar == 0: muL[0].reset((1, 0), muL[0].dtype)
return muL
def buildEnergyNormForm(self):
"""
Build sparse matrix (in CSR format) representative of scalar product.
"""
self.energyNormMatrix = 1.
def buildEnergyNormDualForm(self):
"""
Build sparse matrix (in CSR format) representative of dual scalar
product without duality.
"""
self.energyNormDualMatrix = 1.
def innerProduct(self, u:Np2D, v:Np2D, onlyDiag : bool = False,
dual : bool = False, is_state : bool = True) -> Np2D:
"""Scalar product."""
if is_state or self.isCEye:
if dual:
if not hasattr(self, "energyNormDualMatrix"):
self.buildEnergyNormDualForm()
energyMat = self.energyNormDualMatrix
else:
if not hasattr(self, "energyNormMatrix"):
self.buildEnergyNormForm()
energyMat = self.energyNormMatrix
else:
energyMat = self.outputNormMatrix
if not isinstance(u, (np.ndarray,)): u = u.data
if not isinstance(v, (np.ndarray,)): v = v.data
if onlyDiag:
return np.sum(dot(energyMat, u) * v.conj(), axis = 0)
return dot(dot(energyMat, u).T, v.conj()).T
def norm(self, u:Np2D, dual : bool = False,
is_state : bool = True) -> Np1D:
return np.abs(self.innerProduct(u, u, onlyDiag = True, dual = dual,
is_state = is_state)) ** .5
def baselineA(self):
"""Return 0 of shape consistent with operator of linear system."""
if (hasattr(self, "As") and hasattr(self.As, "__len__")
and self.As[0] is not None):
d = self.As[0].shape[0]
else:
d = self.spacedim
return scsp.csr_matrix((np.zeros(0), np.zeros(0), np.zeros(d + 1)),
shape = (d, d), dtype = np.complex)
def baselineb(self):
"""Return 0 of shape consistent with RHS of linear system."""
return np.zeros(self.spacedim, dtype = np.complex)
@nonaffine_construct
@abstractmethod
def A(self, mu : paramVal = [], der : List[int] = 0) -> Np2D:
"""
Assemble terms of operator of linear system and return it (or its
derivative) at a given parameter.
"""
return
@nonaffine_construct
@abstractmethod
def b(self, mu : paramVal = [], der : List[int] = 0) -> Np1D:
"""
Assemble terms of RHS of linear system and return it (or its
derivative) at a given parameter.
"""
return
@property
def C(self):
"""Value of C."""
if self._C is None: self._C = 1.
return self._C
@property
def isCEye(self):
return isinstance(self.C, Number)
def applyC(self, u:sampList):
"""Apply LHS of linear system."""
return dot(self.C, u)
def applyCpInv(self, u:sampList):
"""Apply pseudoinverse of LHS of linear system."""
return dot(customPInv(self.C), u)
def setSolver(self, solverType:str, solverArgs : DictAny = {}):
"""Choose solver type and parameters."""
self._solver, self._solverArgs = setupSolver(solverType, solverArgs)
def solve(self, mu : paramList = [], RHS : sampList = None,
return_state : bool = False) -> sampList:
"""
Find solution of linear system.
Args:
mu: parameter value.
RHS: RHS of linear system. If None, defaults to that of parametric
system. Defaults to None.
return_state: whether to return state before multiplication by c.
Defaults to False.
"""
from rrompy.sampling import sampleList, emptySampleList
if mu == []: mu = self.mu0
mu = self.checkParameterList(mu)[0]
if len(mu) == 0: return emptySampleList()
if _MPI_IS_LOADED and not self._forceSerial:
comm = MPI.COMM_WORLD
crank = comm.Get_rank()
if crank == 0:
mupart, idxpart = partition(mu, comm.Get_size(), True)
else:
mupart, idxpart = None, None
mu0 = self.checkParameterList(mu)[0]
mu = self.checkParameterList(comm.scatter(mupart, root = 0))[0]
idxpart = comm.bcast(idxpart, root = 0)
if len(mu) == 0:
sol = emptySampleList()
else:
if RHS is None:
RHS = sampleList([self.b(m) for m in mu])
else:
RHS = sampleList(RHS)
if len(RHS) > 1: RHS = sampleList([RHS[x] for x in idxpart])
mult = 0 if len(RHS) == 1 else 1
RROMPyAssert(mult * (len(mu) - 1) + 1, len(RHS), "Sample size")
for j in range(len(mu)):
u = tsolve(self.A(mu[j]), RHS[mult * j], self._solver,
self._solverArgs)
if j == 0:
sol = emptySampleList()
sol.reset((len(u), len(mu)), dtype = u.dtype)
sol[j] = u
if not return_state:
sol = sampleList(self.applyC(sol.data))
if _MPI_IS_LOADED and not self._forceSerial:
solsdata = comm.allgather(sol.data.T)
sol = emptySampleList()
sol.reset((solsdata[0].shape[1], len(mu0)),
dtype = solsdata[0].dtype)
for idx, dat in zip(idxpart, solsdata):
if len(idx) > 0: sol[idx] = dat
return sol
def residual(self, mu : paramList = [], u : sampList = None,
post_c : bool = True) -> sampList:
"""
Find residual of linear system for given approximate solution.
Args:
mu: parameter value.
u: numpy complex array with function dofs. If None, set to 0.
post_c: whether to post-process using c. Defaults to True.
"""
from rrompy.sampling import sampleList, emptySampleList
if mu == []: mu = self.mu0
mu = self.checkParameterList(mu)[0]
if len(mu) == 0: return emptySampleList()
v = sampleList(np.zeros((self.spacedim, len(mu))))
if u is not None: v = v + sampleList(u)
for j in range(len(mu)):
r = self.b(mu[j]) - dot(self.A(mu[j]), v[j])
if post_c:
if j == 0: res = np.empty((len(r), len(mu)), dtype = r.dtype)
res[:, j] = r
else:
if j == 0:
res = emptySampleList()
res.reset((len(r), len(mu)), dtype = r.dtype)
res[j] = r
if post_c: res = sampleList(self.applyC(res))
return res
def stabilityFactor(self, mu : paramList = [], u : sampList = None,
nIterP : int = 10, nIterR : int = 10) -> sampList:
"""
Find stability factor of matrix of linear system using iterative
inverse power iteration- and Rayleigh quotient-based procedure.
Args:
mu: parameter values.
u: numpy complex arrays with function dofs.
nIterP: number of iterations of power method.
nIterR: number of iterations of Rayleigh quotient method.
"""
from rrompy.sampling import sampleList
if mu == []: mu = self.mu0
mu = self.checkParameterList(mu)[0]
u = sampleList(u)
if len(u) != len(mu):
u = sampleList(np.tile(u.data.reshape(-1, 1), (1, len(mu))))
stabFact = np.empty(len(mu), dtype = float)
if not hasattr(self, "energyNormMatrix"):
self.buildEnergyNormForm()
for j in range(len(mu)):
stabFact[j] = rayleighQuotientIteration(self.A(mu[j]), u[j],
self.energyNormMatrix,
0., nIterP, nIterR,
self._solver,
self._solverArgs)
return stabFact

Event Timeline