diff --git a/rrompy/hfengines/base/matrix_engine_base.py b/rrompy/hfengines/base/matrix_engine_base.py
index 65f9fa7..5908ff3 100644
--- a/rrompy/hfengines/base/matrix_engine_base.py
+++ b/rrompy/hfengines/base/matrix_engine_base.py
@@ -1,460 +1,460 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
from abc import abstractmethod
import numpy as np
import scipy.sparse as scsp
from matplotlib import pyplot as plt
from copy import deepcopy as copy, copy as softcopy
from rrompy.utilities.base.types import (Np1D, Np2D, ScOp, strLst, Tuple, List,
DictAny, paramVal, paramList,
sampList)
from rrompy.utilities.base import (purgeList, getNewFilename, verbosityDepth,
multibinom)
from rrompy.utilities.poly_fitting.polynomial import (
hashDerivativeToIdx as hashD, hashIdxToDerivative as hashI)
from rrompy.utilities.exception_manager import RROMPyException, RROMPyAssert
from rrompy.parameter import checkParameter, checkParameterList
from rrompy.sampling import sampleList, emptySampleList
from rrompy.solver import setupSolver
from rrompy.solver.scipy import tensorizeLS, detensorizeLS
__all__ = ['MatrixEngineBase']
class MatrixEngineBase:
"""
Generic solver for parametric matrix problems.
Attributes:
verbosity: Verbosity level.
As: Scipy sparse array representation (in CSC format) of As.
bs: Numpy array representation of bs.
bsH: Numpy array representation of homogeneized bs.
energyNormMatrix: Scipy sparse matrix representing inner product.
"""
_solveBatchSize = 1
def __init__(self, verbosity : int = 10, timestamp : bool = True):
self.verbosity = verbosity
self.timestamp = timestamp
self.nAs, self.nbs = 1, 1
self.setSolver("SPSOLVE", {"use_umfpack" : False})
self.npar = 0
def name(self) -> str:
return self.__class__.__name__
def __str__(self) -> str:
return self.name()
def __repr__(self) -> str:
return self.__str__() + " at " + hex(id(self))
def __dir_base__(self):
return [x for x in self.__dir__() if x[:2] != "__"]
def __deepcopy__(self, memo):
return softcopy(self)
@property
def npar(self):
"""Value of npar."""
return self._npar
@npar.setter
def npar(self, npar):
nparOld = self._npar if hasattr(self, "_npar") else -1
if npar != nparOld:
self.rescalingExp = [1.] * npar
self._npar = npar
@property
def nAs(self):
"""Value of nAs."""
return self._nAs
@nAs.setter
def nAs(self, nAs):
self._nAs = nAs
self.resetAs()
@property
def nbs(self):
"""Value of nbs."""
return self._nbs
@nbs.setter
def nbs(self, nbs):
self._nbs = nbs
self.resetbs()
@property
def nbsH(self) -> int:
return max(self.nbs, self.nAs)
def spacedim(self):
return self.As[0].shape[1]
def checkParameter(self, mu:paramList):
return checkParameter(mu, self.npar)
def checkParameterList(self, mu:paramList):
return checkParameterList(mu, self.npar)
def buildEnergyNormForm(self): # eye
"""
Build sparse matrix (in CSR format) representative of scalar product.
"""
self.energyNormMatrix = np.eye(self.spacedim())
def innerProduct(self, u:Np2D, v:Np2D, onlyDiag : bool = False) -> Np2D:
"""Scalar product."""
if not hasattr(self, "energyNormMatrix"):
if self.verbosity >= 20:
verbosityDepth("INIT", "Assembling energy matrix.",
timestamp = self.timestamp)
self.buildEnergyNormForm()
if self.verbosity >= 20:
verbosityDepth("DEL", "Done assembling energy matrix.",
timestamp = self.timestamp)
if not isinstance(u, (np.ndarray,)): u = u.data
if not isinstance(v, (np.ndarray,)): v = v.data
if onlyDiag:
return np.sum(self.energyNormMatrix.dot(u) * v.conj(), axis = 0)
return v.T.conj().dot(self.energyNormMatrix.dot(u))
def norm(self, u:Np2D) -> Np1D:
return np.abs(self.innerProduct(u, u, onlyDiag = True)) ** .5
def checkAInBounds(self, derI : int = 0):
"""Check if derivative index is oob for operator of linear system."""
if derI < 0 or derI >= self.nAs:
d = self.spacedim()
return scsp.csr_matrix((np.zeros(0), np.zeros(0), np.zeros(d + 1)),
shape = (d, d), dtype = np.complex)
def checkbInBounds(self, derI : int = 0, homogeneized : bool = False):
"""Check if derivative index is oob for RHS of linear system."""
nbs = self.nbsH if homogeneized else self.nbs
if derI < 0 or derI >= nbs:
return np.zeros(self.spacedim(), dtype = np.complex)
def resetAs(self):
"""Reset (derivatives of) operator of linear system."""
self.setAs([None] * self.nAs)
if hasattr(self, "_nbs"): self.resetbsH()
def resetbs(self):
"""Reset (derivatives of) RHS of linear system."""
self.setbs([None] * self.nbs)
if hasattr(self, "_nAs"): self.resetbsH()
def resetbsH(self):
"""Reset (derivatives of) homogeneized RHS of linear system."""
self.setbsH([None] * self.nbsH)
def setAs(self, As:List[Np2D]):
"""Assign terms of operator of linear system."""
if len(As) != self.nAs:
raise RROMPyException(("Expected number {} of terms of As not "
"matching given list length {}.").format(self.nAs,
len(As)))
self.As = [copy(A) for A in As]
def setbs(self, bs:List[Np1D]):
"""Assign terms of RHS of linear system."""
if len(bs) != self.nbs:
raise RROMPyException(("Expected number {} of terms of bs not "
"matching given list length {}.").format(self.nbs,
len(bs)))
self.bs = [copy(b) for b in bs]
def setbsH(self, bsH:List[Np1D]):
"""Assign terms of homogeneized RHS of linear system."""
if len(bsH) != self.nbsH:
raise RROMPyException(("Expected number {} of terms of bsH not "
"matching given list length {}.").format(self.nbsH,
len(bsH)))
self.bsH = [copy(bH) for bH in bsH]
def _assembleA(self, mu : paramVal = [], der : List[int] = 0,
derI : int = None, muBase : paramVal = None) -> ScOp:
"""Assemble (derivative of) operator of linear system."""
mu = self.checkParameter(mu)
if muBase is None: muBase = [0.] * self.npar
muBase = self.checkParameter(muBase)
if self.npar > 0: mu, muBase = mu[0], muBase[0]
if not hasattr(der, "__len__"): der = [der] * self.npar
if derI is None: derI = hashD(der)
Anull = self.checkAInBounds(derI)
if Anull is not None: return Anull
rExp = self.rescalingExp
A = copy(self.As[derI])
for j in range(derI + 1, self.nAs):
derIdx = hashI(j, self.npar)
diffIdx = [x - y for (x, y) in zip(derIdx, der)]
if np.all([x >= 0 for x in diffIdx]):
A = A + (np.prod((mu ** rExp - muBase ** rExp) ** diffIdx)
* multibinom(derIdx, diffIdx) * self.As[j])
return A
@abstractmethod
def A(self, mu : paramVal = [], der : List[int] = 0) -> ScOp:
"""
Assemble terms of operator of linear system and return it (or its
derivative) at a given parameter.
"""
if not hasattr(der, "__len__"): der = [der] * self.npar
derI = hashD(der)
for j in range(derI, self.nAs):
if self.As[j] is None: self.As[j] = self.checkAInBounds(-1)
return self._assembleA(mu, der, derI)
def affineLinearSystemA(self, mu : paramVal = []) -> List[Np2D]:
"""
Assemble affine blocks of operator of linear system (just linear
blocks).
"""
As = [None] * self.nAs
for j in range(self.nAs):
As[j] = self.A(mu, hashI(j, self.npar))
return As
def affineWeightsA(self, mu : paramVal = []) -> List[str]:
"""
Assemble affine blocks of operator of linear system (just affine
weights). Stored as strings for the sake of pickling.
"""
mu = self.checkParameter(mu)
lambdasA = ["1."]
mu0Eff = mu ** self.rescalingExp
for j in range(1, self.nAs):
lambdasA += ["np.prod((mu ** ({1}) - [{0}]) ** ({2}))".format(
','.join([str(x) for x in mu0Eff[0]]),
self.rescalingExp, hashI(j, self.npar))]
return lambdasA
def affineBlocksA(self, mu : paramVal = [])\
-> Tuple[List[Np2D], List[str]]:
"""Assemble affine blocks of operator of linear system."""
return self.affineLinearSystemA(mu), self.affineWeightsA(mu)
def _assembleb(self, mu : paramVal = [], der : List[int] = 0,
derI : int = None, homogeneized : bool = False,
muBase : paramVal = None) -> ScOp:
"""Assemble (derivative of) (homogeneized) RHS of linear system."""
mu = self.checkParameter(mu)
if muBase is None: muBase = [0.] * self.npar
muBase = self.checkParameter(muBase)
if self.npar > 0: mu, muBase = mu[0], muBase[0]
if not hasattr(der, "__len__"): der = [der] * self.npar
if derI is None: derI = hashD(der)
bnull = self.checkbInBounds(derI, homogeneized)
if bnull is not None: return bnull
bs = self.bsH if homogeneized else self.bs
rExp = self.rescalingExp
b = copy(bs[derI])
for j in range(derI + 1, len(bs)):
derIdx = hashI(j, self.npar)
diffIdx = [x - y for (x, y) in zip(derIdx, der)]
if np.all([x >= 0 for x in diffIdx]):
b = b + (np.prod((mu ** rExp - muBase ** rExp) ** diffIdx)
* multibinom(derIdx, diffIdx) * bs[j])
return b
@abstractmethod
def b(self, mu : paramVal = [], der : List[int] = 0,
homogeneized : bool = False) -> Np1D:
"""
Assemble terms of (homogeneized) RHS of linear system and return it (or
its derivative) at a given parameter.
"""
if not hasattr(der, "__len__"): der = [der] * self.npar
derI = hashD(der)
if homogeneized:
for j in range(derI, self.nbsH):
if self.bsH[j] is None: self.bsH[j] = self.checkbInBounds(-1)
else:
for j in range(derI, self.nbs):
if self.bs[j] is None: self.bs[j] = self.checkbInBounds(-1)
return self._assembleb(mu, der, derI, homogeneized)
def affineLinearSystemb(self, mu : paramVal = [],
homogeneized : bool = False) -> List[Np1D]:
"""
Assemble affine blocks of RHS of linear system (just linear blocks).
"""
nbs = self.nbsH if homogeneized else self.nbs
bs = [None] * nbs
for j in range(nbs):
bs[j] = self.b(mu, hashI(j, self.npar), homogeneized)
return bs
def affineWeightsb(self, mu : paramVal = [],
homogeneized : bool = False) -> List[str]:
"""
Assemble affine blocks of RHS of linear system (just affine weights).
Stored as strings for the sake of pickling.
"""
mu = self.checkParameter(mu)
nbs = self.nbsH if homogeneized else self.nbs
lambdasb = ["1."]
mu0Eff = mu ** self.rescalingExp
for j in range(1, nbs):
lambdasb += ["np.prod((mu ** ({1}) - [{0}]) ** ({2}))".format(
','.join([str(x) for x in mu0Eff[0]]),
self.rescalingExp, hashI(j, self.npar))]
return lambdasb
def affineBlocksb(self, mu : paramVal = [], homogeneized : bool = False)\
-> Tuple[List[Np1D], List[str]]:
"""Assemble affine blocks of RHS of linear system."""
return (self.affineLinearSystemb(mu, homogeneized),
self.affineWeightsb(mu, homogeneized))
def setSolver(self, solverType:str, solverArgs : DictAny = {}):
"""Choose solver type and parameters."""
self._solver, self._solverArgs = setupSolver(solverType, solverArgs)
def solve(self, mu : paramList = [], RHS : sampList = None,
homogeneized : bool = False) -> sampList:
"""
Find solution of linear system.
Args:
mu: parameter value.
RHS: RHS of linear system. If None, defaults to that of parametric
system. Defaults to None.
"""
mu, _ = self.checkParameterList(mu)
if mu.shape[0] == 0: mu.reset((1, self.npar), mu.dtype)
sol = emptySampleList()
if len(mu) > 0:
if RHS is None:
RHS = [self.b(m, homogeneized = homogeneized) for m in mu]
RHS = sampleList(RHS)
mult = 0 if len(RHS) == 1 else 1
RROMPyAssert(mult * (len(mu) - 1) + 1, len(RHS), "Sample size")
u = self._solver(self.A(mu[0]), RHS[0], self._solverArgs)
sol.reset((len(u), len(mu)), dtype = u.dtype)
sol[0] = u
for j in range(1, len(mu), self._solveBatchSize):
if self._solveBatchSize != 1:
iRange = list(range(j, min(j + self._solveBatchSize,
len(mu))))
As = [self.A(mu[i]) for i in iRange]
bs = [RHS[mult * i] for i in iRange]
A, b = tensorizeLS(As, bs)
else:
A, b = self.A(mu[j]), RHS[mult * j]
solStack = self._solver(A, b, self._solverArgs)
if self._solveBatchSize != 1:
sol[iRange] = detensorizeLS(solStack, len(iRange))
else:
sol[j] = solStack
return sol
def residual(self, u:sampList, mu : paramList = [],
homogeneized : bool = False) -> sampList:
"""
Find residual of linear system for given approximate solution.
Args:
u: numpy complex array with function dofs. If None, set to 0.
mu: parameter value.
"""
mu, _ = self.checkParameterList(mu)
if mu.shape[0] == 0: mu.reset((1, self.npar), mu.dtype)
if u is not None:
u = sampleList(u)
mult = 0 if len(u) == 1 else 1
RROMPyAssert(mult * (len(mu) - 1) + 1, len(u), "Sample size")
res = emptySampleList()
for j in range(len(mu)):
b = self.b(mu[j], homogeneized = homogeneized)
if u is None:
r = b
else:
r = b - self.A(mu[j]).dot(u[mult * j])
if j == 0:
res.reset((len(r), len(mu)), dtype = r.dtype)
res[j] = r
return res
def plot(self, u:Np1D, name : str = "u", save : str = None,
what : strLst = 'all', saveFormat : str = "eps",
saveDPI : int = 100, show : bool = True, **figspecs):
"""
Do some nice plots of the complex-valued function with given dofs.
Args:
u: numpy complex array with function dofs.
name(optional): Name to be shown as title of the plots. Defaults to
'u'.
save(optional): Where to save plot(s). Defaults to None, i.e. no
saving.
what(optional): Which plots to do. If list, can contain 'ABS',
'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard 'ALL'.
Defaults to 'ALL'.
saveFormat(optional): Format for saved plot(s). Defaults to "eps".
saveDPI(optional): DPI for saved plot(s). Defaults to 100.
show(optional): Whether to show figure. Defaults to True.
figspecs(optional key args): Optional arguments for matplotlib
figure creation.
"""
if isinstance(what, (str,)):
if what.upper() == 'ALL':
what = ['ABS', 'PHASE', 'REAL', 'IMAG']
else:
what = [what]
what = purgeList(what, ['ABS', 'PHASE', 'REAL', 'IMAG'],
listname = self.name() + ".what", baselevel = 1)
if len(what) == 0: return
if 'figsize' not in figspecs.keys():
figspecs['figsize'] = (13. * len(what) / 4, 3)
subplotcode = 100 + len(what) * 10
idxs = np.arange(self.spacedim())
plt.figure(**figspecs)
plt.jet()
if 'ABS' in what:
subplotcode = subplotcode + 1
plt.subplot(subplotcode)
- plt.plot(idxs, np.abs(u))
+ plt.plot(idxs, np.abs(u).flatten())
plt.title("|{0}|".format(name))
if 'PHASE' in what:
subplotcode = subplotcode + 1
plt.subplot(subplotcode)
- plt.plot(idxs, np.angle(u))
+ plt.plot(idxs, np.angle(u).flatten())
plt.title("phase({0})".format(name))
if 'REAL' in what:
subplotcode = subplotcode + 1
plt.subplot(subplotcode)
- plt.plot(idxs, np.real(u))
+ plt.plot(idxs, np.real(u).flatten())
plt.title("Re({0})".format(name))
if 'IMAG' in what:
subplotcode = subplotcode + 1
plt.subplot(subplotcode)
- plt.plot(idxs, np.imag(u))
+ plt.plot(idxs, np.imag(u).flatten())
plt.title("Im({0})".format(name))
if save is not None:
save = save.strip()
plt.savefig(getNewFilename("{}_fig_".format(save), saveFormat),
format = saveFormat, dpi = saveDPI)
if show:
plt.show()
plt.close()
diff --git a/rrompy/solver/scipy/scipy_tensorize.py b/rrompy/solver/scipy/scipy_tensorize.py
index 6abeb2e..3eaa14d 100644
--- a/rrompy/solver/scipy/scipy_tensorize.py
+++ b/rrompy/solver/scipy/scipy_tensorize.py
@@ -1,53 +1,51 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
import numpy as np
import scipy.sparse as scsp
from rrompy.utilities.base.types import Np1D, Np2D, List
from rrompy.utilities.exception_manager import RROMPyException
__all__ = ['tensorizeLS', 'detensorizeLS']
def tensorizeLS(As : List[Np2D] = [], bs : List[Np1D] = [],
AFormat : str = "csr"):
if len(As) > 0:
-# A = scsp.block_diag(tuple(As), format = AFormat)
A = scsp.block_diag(As, format = AFormat)
else:
A = None
if len(bs) > 0:
-# b = np.concatenate(tuple(bs), axis = None)
b = np.concatenate(bs, axis = None)
else:
b = None
return A, b
def detensorizeLS(x:Np1D, n : int = 0, sizes : List[int] = []):
if n > 0 and len(sizes) > 0 and n != len(sizes):
raise RROMPyException("Specified n and sizes are inconsistent.")
if n == 0 and len(sizes) == 0:
raise RROMPyException("Must specify either n or sizes.")
if len(sizes) == 0:
sizes = [len(x) // n] * n
if n * sizes[0] != len(x):
raise RROMPyException(("Number of chunks must divide vector "
"length."))
n = len(sizes)
sEnd = np.cumsum(sizes)
sStart = sEnd - sizes[0]
return [x[sStart[j] : sEnd[j]] for j in range(n)]