Page MenuHomec4science

matrix_engine_base.py
No OneTemporary

File Metadata

Created
Wed, May 8, 22:56

matrix_engine_base.py

# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see <http://www.gnu.org/licenses/>.
#
from abc import abstractmethod
import numpy as np
import scipy.sparse as scsp
from matplotlib import pyplot as plt
from copy import deepcopy as copy, copy as softcopy
from rrompy.utilities.base.types import (Np1D, Np2D, ScOp, strLst, Tuple, List,
ListAny, DictAny, paramVal, paramList,
sampList)
from rrompy.utilities.base import (purgeList, getNewFilename,
verbosityManager as vbMng)
from rrompy.utilities.numerical import multibinom
from rrompy.utilities.poly_fitting.polynomial import (
hashDerivativeToIdx as hashD, hashIdxToDerivative as hashI)
from rrompy.utilities.exception_manager import RROMPyException, RROMPyAssert
from rrompy.parameter import checkParameter, checkParameterList
from rrompy.sampling import sampleList, emptySampleList
from rrompy.solver import setupSolver, Np2DLikeEye
from rrompy.solver.scipy import tensorizeLS, detensorizeLS
__all__ = ['MatrixEngineBase']
class MatrixEngineBase:
"""
Generic solver for parametric matrix problems.
Attributes:
verbosity: Verbosity level.
As: Scipy sparse array representation (in CSC format) of As.
bs: Numpy array representation of bs.
bsH: Numpy array representation of homogeneized bs.
energyNormMatrix: Scipy sparse matrix representing inner product.
energyNormDualMatrix: Scipy sparse matrix representing dual inner
product.
dualityMatrix: Scipy sparse matrix representing duality.
energyNormPartialDualMatrix: Scipy sparse matrix representing dual
inner product without duality.
"""
_solveBatchSize = 1
def __init__(self, verbosity : int = 10, timestamp : bool = True):
self.verbosity = verbosity
self.timestamp = timestamp
self.nAs, self.nbs = 1, 1
self.setSolver("SPSOLVE", {"use_umfpack" : False})
self.npar = 0
def name(self) -> str:
return self.__class__.__name__
def __str__(self) -> str:
return self.name()
def __repr__(self) -> str:
return self.__str__() + " at " + hex(id(self))
def __dir_base__(self):
return [x for x in self.__dir__() if x[:2] != "__"]
def __deepcopy__(self, memo):
return softcopy(self)
@property
def npar(self):
"""Value of npar."""
return self._npar
@npar.setter
def npar(self, npar):
nparOld = self._npar if hasattr(self, "_npar") else -1
if npar != nparOld:
self.rescalingExp = [1.] * npar
self._npar = npar
@property
def nAs(self):
"""Value of nAs."""
return self._nAs
@nAs.setter
def nAs(self, nAs):
self._nAs = nAs
self.resetAs()
@property
def nbs(self):
"""Value of nbs."""
return self._nbs
@nbs.setter
def nbs(self, nbs):
self._nbs = nbs
self.resetbs()
@property
def nbsH(self) -> int:
return max(self.nbs, self.nAs)
def spacedim(self):
return self.As[0].shape[1]
def checkParameter(self, mu:paramList):
return checkParameter(mu, self.npar)
def checkParameterList(self, mu:paramList):
return checkParameterList(mu, self.npar)
def buildEnergyNormForm(self): # eye
"""
Build sparse matrix (in CSR format) representative of scalar product.
"""
vbMng(self, "INIT", "Assembling energy matrix.", 20)
self.energyNormMatrix = Np2DLikeEye()
vbMng(self, "DEL", "Done assembling energy matrix.", 20)
def buildEnergyNormDualForm(self):
"""
Build sparse matrix (in CSR format) representative of dual scalar
product.
"""
if not hasattr(self, "energyNormMatrix"):
self.buildEnergyNormForm()
vbMng(self, "INIT", "Assembling energy dual matrix.", 20)
self.energyNormDualMatrix = self.energyNormMatrix
vbMng(self, "DEL", "Done assembling energy dual matrix.", 20)
def buildDualityPairingForm(self):
"""Build sparse matrix (in CSR format) representative of duality."""
if not hasattr(self, "energyNormMatrix"):
self.buildEnergyNormForm()
vbMng(self, "INIT", "Assembling duality matrix.", 20)
self.dualityMatrix = self.energyNormMatrix
vbMng(self, "DEL", "Done assembling duality matrix.", 20)
def buildEnergyNormPartialDualForm(self):
"""
Build sparse matrix (in CSR format) representative of dual scalar
product without duality.
"""
if not hasattr(self, "energyNormDualMatrix"):
self.buildEnergyNormDualForm()
vbMng(self, "INIT", "Assembling energy partial dual matrix.", 20)
self.energyNormPartialDualMatrix = self.energyNormDualMatrix
vbMng(self, "DEL", "Done assembling energy partial dual matrix.", 20)
def innerProduct(self, u:Np2D, v:Np2D, onlyDiag : bool = False,
dual : bool = False, duality : bool = True) -> Np2D:
"""Scalar product."""
if dual:
if duality:
if not hasattr(self, "energyNormDualMatrix"):
self.buildEnergyNormDualForm()
energyMat = self.energyNormDualMatrix
else:
if not hasattr(self, "energyNormPartialDualMatrix"):
self.buildEnergyNormPartialDualForm()
energyMat = self.energyNormPartialDualMatrix
else:
if not hasattr(self, "energyNormMatrix"):
self.buildEnergyNormForm()
energyMat = self.energyNormMatrix
if not isinstance(u, (np.ndarray,)): u = u.data
if not isinstance(v, (np.ndarray,)): v = v.data
if onlyDiag:
return np.sum(energyMat.dot(u) * v.conj(), axis = 0)
return v.T.conj().dot(energyMat.dot(u))
def norm(self, u:Np2D, dual : bool = False, duality : bool = True) -> Np1D:
return np.abs(self.innerProduct(u, u, onlyDiag = True, dual = dual,
duality = duality)) ** .5
def checkAInBounds(self, derI : int = 0):
"""Check if derivative index is oob for operator of linear system."""
if derI < 0 or derI >= self.nAs:
d = self.spacedim()
return scsp.csr_matrix((np.zeros(0), np.zeros(0), np.zeros(d + 1)),
shape = (d, d), dtype = np.complex)
def checkbInBounds(self, derI : int = 0, homogeneized : bool = False):
"""Check if derivative index is oob for RHS of linear system."""
nbs = self.nbsH if homogeneized else self.nbs
if derI < 0 or derI >= nbs:
return np.zeros(self.spacedim(), dtype = np.complex)
def resetAs(self):
"""Reset (derivatives of) operator of linear system."""
self.setAs([None] * self.nAs)
if hasattr(self, "_nbs"): self.resetbsH()
def resetbs(self):
"""Reset (derivatives of) RHS of linear system."""
self.setbs([None] * self.nbs)
if hasattr(self, "_nAs"): self.resetbsH()
def resetbsH(self):
"""Reset (derivatives of) homogeneized RHS of linear system."""
self.setbsH([None] * self.nbsH)
def setAs(self, As:List[Np2D]):
"""Assign terms of operator of linear system."""
if len(As) != self.nAs:
raise RROMPyException(("Expected number {} of terms of As not "
"matching given list length {}.").format(self.nAs,
len(As)))
self.As = [copy(A) for A in As]
def setbs(self, bs:List[Np1D]):
"""Assign terms of RHS of linear system."""
if len(bs) != self.nbs:
raise RROMPyException(("Expected number {} of terms of bs not "
"matching given list length {}.").format(self.nbs,
len(bs)))
self.bs = [copy(b) for b in bs]
def setbsH(self, bsH:List[Np1D]):
"""Assign terms of homogeneized RHS of linear system."""
if len(bsH) != self.nbsH:
raise RROMPyException(("Expected number {} of terms of bsH not "
"matching given list length {}.").format(self.nbsH,
len(bsH)))
self.bsH = [copy(bH) for bH in bsH]
def _assembleA(self, mu : paramVal = [], der : List[int] = 0,
derI : int = None) -> ScOp:
"""Assemble (derivative of) operator of linear system."""
mu = self.checkParameter(mu)
if self.npar > 0: mu = mu[0]
if not hasattr(der, "__len__"): der = [der] * self.npar
if derI is None: derI = hashD(der)
Anull = self.checkAInBounds(derI)
if Anull is not None: return Anull
rExp = self.rescalingExp
A = copy(self.As[derI])
for j in range(derI + 1, self.nAs):
derIdx = hashI(j, self.npar)
diffIdx = [x - y for (x, y) in zip(derIdx, der)]
if np.all([x >= 0 for x in diffIdx]):
A = A + (np.prod((mu ** rExp) ** diffIdx)
* multibinom(derIdx, diffIdx) * self.As[j])
return A
@abstractmethod
def A(self, mu : paramVal = [], der : List[int] = 0) -> ScOp:
"""
Assemble terms of operator of linear system and return it (or its
derivative) at a given parameter.
"""
if not hasattr(der, "__len__"): der = [der] * self.npar
derI = hashD(der)
for j in range(derI, self.nAs):
if self.As[j] is None: self.As[j] = self.checkAInBounds(-1)
return self._assembleA(mu, der, derI)
def affineLinearSystemA(self, mu : paramVal = [],
sclF : ListAny = None) -> List[Np2D]:
"""
Assemble affine blocks of operator of linear system (just linear
blocks).
"""
if sclF is None: sclF = [1.] * self.npar
As = [None] * self.nAs
for j in range(self.nAs):
derIdx = hashI(j, self.npar)
mult = np.prod([s ** d for (s, d) in zip(sclF, derIdx)])
As[j] = mult * self.A(mu, derIdx)
return As
def _assembleb(self, mu : paramVal = [], der : List[int] = 0,
derI : int = None, homogeneized : bool = False) -> ScOp:
"""Assemble (derivative of) (homogeneized) RHS of linear system."""
mu = self.checkParameter(mu)
if self.npar > 0: mu = mu[0]
if not hasattr(der, "__len__"): der = [der] * self.npar
if derI is None: derI = hashD(der)
bnull = self.checkbInBounds(derI, homogeneized)
if bnull is not None: return bnull
bs = self.bsH if homogeneized else self.bs
rExp = self.rescalingExp
b = copy(bs[derI])
for j in range(derI + 1, len(bs)):
derIdx = hashI(j, self.npar)
diffIdx = [x - y for (x, y) in zip(derIdx, der)]
if np.all([x >= 0 for x in diffIdx]):
b = b + (np.prod((mu ** rExp) ** diffIdx)
* multibinom(derIdx, diffIdx) * bs[j])
return b
@abstractmethod
def b(self, mu : paramVal = [], der : List[int] = 0,
homogeneized : bool = False) -> Np1D:
"""
Assemble terms of (homogeneized) RHS of linear system and return it (or
its derivative) at a given parameter.
"""
if not hasattr(der, "__len__"): der = [der] * self.npar
derI = hashD(der)
if homogeneized:
for j in range(derI, self.nbsH):
if self.bsH[j] is None: self.bsH[j] = self.checkbInBounds(-1)
else:
for j in range(derI, self.nbs):
if self.bs[j] is None: self.bs[j] = self.checkbInBounds(-1)
return self._assembleb(mu, der, derI, homogeneized)
def affineLinearSystemb(self, mu : paramVal = [], sclF : ListAny = None,
homogeneized : bool = False) -> List[Np1D]:
"""
Assemble affine blocks of RHS of linear system (just linear blocks).
"""
if sclF is None: sclF = [1.] * self.npar
nbs = self.nbsH if homogeneized else self.nbs
bs = [None] * nbs
for j in range(nbs):
derIdx = hashI(j, self.npar)
mult = np.prod([s ** d for (s, d) in zip(sclF, derIdx)])
bs[j] = mult * self.b(mu, derIdx, homogeneized)
return bs
def setSolver(self, solverType:str, solverArgs : DictAny = {}):
"""Choose solver type and parameters."""
self._solver, self._solverArgs = setupSolver(solverType, solverArgs)
def solve(self, mu : paramList = [], RHS : sampList = None,
homogeneized : bool = False) -> sampList:
"""
Find solution of linear system.
Args:
mu: parameter value.
RHS: RHS of linear system. If None, defaults to that of parametric
system. Defaults to None.
"""
mu = self.checkParameterList(mu)[0]
if mu.shape[0] == 0: mu.reset((1, self.npar), mu.dtype)
sol = emptySampleList()
if len(mu) > 0:
if RHS is None:
RHS = [self.b(m, homogeneized = homogeneized) for m in mu]
RHS = sampleList(RHS)
mult = 0 if len(RHS) == 1 else 1
RROMPyAssert(mult * (len(mu) - 1) + 1, len(RHS), "Sample size")
u = self._solver(self.A(mu[0]), RHS[0], self._solverArgs)
sol.reset((len(u), len(mu)), dtype = u.dtype)
sol[0] = u
for j in range(1, len(mu), self._solveBatchSize):
if self._solveBatchSize != 1:
iRange = list(range(j, min(j + self._solveBatchSize,
len(mu))))
As = [self.A(mu[i]) for i in iRange]
bs = [RHS[mult * i] for i in iRange]
A, b = tensorizeLS(As, bs)
else:
A, b = self.A(mu[j]), RHS[mult * j]
solStack = self._solver(A, b, self._solverArgs)
if self._solveBatchSize != 1:
sol[iRange] = detensorizeLS(solStack, len(iRange))
else:
sol[j] = solStack
return sol
def residual(self, u:sampList, mu : paramList = [],
homogeneized : bool = False,
duality : bool = True) -> sampList:
"""
Find residual of linear system for given approximate solution.
Args:
u: numpy complex array with function dofs. If None, set to 0.
mu: parameter value.
"""
mu = self.checkParameterList(mu)[0]
if mu.shape[0] == 0: mu.reset((1, self.npar), mu.dtype)
if u is not None:
u = sampleList(u)
mult = 0 if len(u) == 1 else 1
RROMPyAssert(mult * (len(mu) - 1) + 1, len(u), "Sample size")
res = emptySampleList()
if duality and not hasattr(self, "dualityMatrix"):
self.buildDualityPairingForm()
for j in range(len(mu)):
b = self.b(mu[j], homogeneized = homogeneized)
if u is None:
r = b
else:
r = b - self.A(mu[j]).dot(u[mult * j])
if j == 0:
res.reset((len(r), len(mu)), dtype = r.dtype)
if duality:
r = self.dualityMatrix.dot(r)
res[j] = r
return res
def _rayleighQuotient(self, A:Np2D, v0:Np1D, M:Np2D, sigma : float = 0.,
nIterP : int = 10, nIterR : int = 10) -> float:
nIterP = min(nIterP, len(v0) // 2)
nIterR = min(nIterR, (len(v0) + 1) // 2)
v0 /= v0.T.conj().dot(M.dot(v0)) ** .5
for j in range(nIterP):
v0 = self._solver(A - sigma * M, M.dot(v0), self._solverArgs)
v0 /= v0.T.conj().dot(M.dot(v0)) ** .5
l0 = v0.T.conj().dot(A.dot(v0))
for j in range(nIterR):
v0 = self._solver(A - l0 * M, M.dot(v0), self._solverArgs)
v0 /= v0.T.conj().dot(M.dot(v0)) ** .5
l0 = v0.T.conj().dot(A.dot(v0))
if np.isnan(l0): l0 = np.finfo(float).eps
return np.abs(l0)
def stabilityFactor(self, u:sampList, mu : paramList = [],
nIterP : int = 10, nIterR : int = 10) -> sampList:
"""
Find stability factor of matrix of linear system using iterative
inverse power iteration- and Rayleigh quotient-based procedure.
Args:
u: numpy complex arrays with function dofs.
mu: parameter values.
nIterP: number of iterations of power method.
nIterR: number of iterations of Rayleigh quotient method.
"""
mu = self.checkParameterList(mu)[0]
if mu.shape[0] == 0: mu.reset((1, self.npar), mu.dtype)
u = sampleList(u)
mult = 0 if len(u) == 1 else 1
RROMPyAssert(mult * (len(mu) - 1) + 1, len(u), "Sample size")
stabFact = np.empty(len(mu), dtype = float)
if not hasattr(self, "energyNormMatrix"):
self.buildEnergyNormForm()
for j in range(len(mu)):
stabFact[j] = self._rayleighQuotient(self.A(mu[j]), u[mult * j],
self.energyNormMatrix,
0., nIterP, nIterR)
return stabFact
def plot(self, u:Np1D, warping : List[callable] = None, name : str = "u",
save : str = None, what : strLst = 'all',
saveFormat : str = "eps", saveDPI : int = 100, show : bool = True,
pyplotArgs : dict = {}, **figspecs):
"""
Do some nice plots of the complex-valued function with given dofs.
Args:
u: numpy complex array with function dofs.
name(optional): Name to be shown as title of the plots. Defaults to
'u'.
save(optional): Where to save plot(s). Defaults to None, i.e. no
saving.
what(optional): Which plots to do. If list, can contain 'ABS',
'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard 'ALL'.
Defaults to 'ALL'.
saveFormat(optional): Format for saved plot(s). Defaults to "eps".
saveDPI(optional): DPI for saved plot(s). Defaults to 100.
show(optional): Whether to show figure. Defaults to True.
pyplotArgs(optional): Optional arguments for pyplot.
figspecs(optional key args): Optional arguments for matplotlib
figure creation.
"""
if isinstance(what, (str,)):
if what.upper() == 'ALL':
what = ['ABS', 'PHASE', 'REAL', 'IMAG']
else:
what = [what]
what = purgeList(what, ['ABS', 'PHASE', 'REAL', 'IMAG'],
listname = self.name() + ".what", baselevel = 1)
if len(what) == 0: return
if 'figsize' not in figspecs.keys():
figspecs['figsize'] = (13. * len(what) / 4, 3)
subplotcode = 100 + len(what) * 10
idxs = np.arange(self.spacedim())
if warping is not None:
idxs = warping[0](np.arange(self.spacedim()))
plt.figure(**figspecs)
plt.jet()
if 'ABS' in what:
subplotcode = subplotcode + 1
plt.subplot(subplotcode)
plt.plot(idxs, np.abs(u).flatten(), **pyplotArgs)
plt.title("|{0}|".format(name))
if 'PHASE' in what:
subplotcode = subplotcode + 1
plt.subplot(subplotcode)
plt.plot(idxs, np.angle(u).flatten(), **pyplotArgs)
plt.title("phase({0})".format(name))
if 'REAL' in what:
subplotcode = subplotcode + 1
plt.subplot(subplotcode)
plt.plot(idxs, np.real(u).flatten(), **pyplotArgs)
plt.title("Re({0})".format(name))
if 'IMAG' in what:
subplotcode = subplotcode + 1
plt.subplot(subplotcode)
plt.plot(idxs, np.imag(u).flatten(), **pyplotArgs)
plt.title("Im({0})".format(name))
if save is not None:
save = save.strip()
plt.savefig(getNewFilename("{}_fig_".format(save), saveFormat),
format = saveFormat, dpi = saveDPI)
if show:
plt.show()
plt.close()

Event Timeline