Page MenuHomec4science

hfengine_base.py
No OneTemporary

File Metadata

Created
Wed, May 8, 16:13

hfengine_base.py

# Copyright (C) 2018-2020 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see <http://www.gnu.org/licenses/>.
#
from abc import abstractmethod
import numpy as np
import scipy.sparse as scsp
from numbers import Number
from collections.abc import Iterable
from copy import copy as softcopy
from rrompy.utilities.base.decorators import (nonaffine_construct,
mu_independent)
from rrompy.utilities.base.types import (Np1D, Np2D, List, DictAny, paramVal,
paramList, sampList)
from rrompy.utilities.numerical import solve as tsolve, dot
from rrompy.utilities.expression import expressionEvaluator
from rrompy.utilities.exception_manager import RROMPyAssert
from rrompy.sampling.sample_list import sampleList
from rrompy.parameter import (checkParameter, checkParameterList,
parameterList, parameterMap as pMap)
from rrompy.solver.linear_solver import setupSolver
from rrompy.utilities.parallel import (poolRank, masterCore, listScatter,
matrixGatherv, isend, recv)
__all__ = ['HFEngineBase']
class HFEngineBase:
"""Generic solver for parametric problems."""
def __init__(self, verbosity : int = 10, timestamp : bool = True):
self.verbosity = verbosity
self.timestamp = timestamp
self.setSolver("SPSOLVE", {"use_umfpack" : False})
self.npar = 0
self._C = None
def name(self) -> str:
return self.__class__.__name__
def __str__(self) -> str:
return self.name()
def __repr__(self) -> str:
return self.__str__() + " at " + hex(id(self))
def __dir_base__(self):
return [x for x in self.__dir__() if x[:2] != "__"]
def __deepcopy__(self, memo):
return softcopy(self)
@property
def npar(self):
"""Value of npar."""
return self._npar
@npar.setter
def npar(self, npar):
nparOld = self._npar if hasattr(self, "_npar") else -1
if npar != nparOld:
self.parameterMap = pMap(1., npar)
self._npar = npar
@property
def spacedim(self):
return 1
def checkParameter(self, mu:paramVal) -> paramVal:
muP = checkParameter(mu, self.npar)
if self.npar == 0: muP.reset((1, 0), muP.dtype)
return muP
def checkParameterList(self, mu:paramList,
check_if_single : bool = False) -> paramList:
muL = checkParameterList(mu, self.npar, check_if_single)
return muL
def mapParameterList(self, mu:paramList, direct : str = "F",
idx : List[int] = None) -> paramList:
if idx is None: idx = np.arange(self.npar)
muMapped = checkParameterList(mu, len(idx))
for j, d in enumerate(idx):
muMapped.data[:, j] = expressionEvaluator(
self.parameterMap[direct][d],
muMapped(j)).flatten()
return muMapped
def buildEnergyNormForm(self):
"""
Build sparse matrix (in CSR format) representative of scalar product.
"""
self.energyNormMatrix = 1.
def buildEnergyNormDualForm(self):
"""
Build sparse matrix (in CSR format) representative of dual scalar
product without duality.
"""
self.energyNormDualMatrix = 1.
def innerProduct(self, u:Np2D, v:Np2D, onlyDiag : bool = False,
dual : bool = False, is_state : bool = True) -> Np2D:
"""Scalar product."""
if is_state or self.isCEye:
if dual:
if not hasattr(self, "energyNormDualMatrix"):
self.buildEnergyNormDualForm()
energyMat = self.energyNormDualMatrix
else:
if not hasattr(self, "energyNormMatrix"):
self.buildEnergyNormForm()
energyMat = self.energyNormMatrix
else:
energyMat = 1.
if isinstance(u, (parameterList, sampleList)): u = u.data
if isinstance(v, (parameterList, sampleList)): v = v.data
if onlyDiag:
return np.sum(dot(energyMat, u) * v.conj(), axis = 0)
return dot(dot(energyMat, u).T, v.conj()).T
def norm(self, u:Np2D, dual : bool = False,
is_state : bool = True) -> Np1D:
return np.abs(self.innerProduct(u, u, onlyDiag = True, dual = dual,
is_state = is_state)) ** .5
def baselineA(self):
"""Return 0 of shape consistent with operator of linear system."""
if (hasattr(self, "As") and isinstance(self.As, Iterable)
and self.As[0] is not None):
d = self.As[0].shape[0]
else:
d = self.spacedim
return scsp.csr_matrix((np.zeros(0), np.zeros(0), np.zeros(d + 1)),
shape = (d, d), dtype = np.complex)
def baselineb(self):
"""Return 0 of shape consistent with RHS of linear system."""
return np.zeros(self.spacedim, dtype = np.complex)
@nonaffine_construct
@abstractmethod
def A(self, mu : paramVal = [], der : List[int] = 0) -> Np2D:
"""
Assemble terms of operator of linear system and return it (or its
derivative) at a given parameter.
"""
return
@nonaffine_construct
@abstractmethod
def b(self, mu : paramVal = [], der : List[int] = 0) -> Np1D:
"""
Assemble terms of RHS of linear system and return it (or its
derivative) at a given parameter.
"""
return
@mu_independent
def C(self, mu:paramVal):
"""
Value of C. Should be overridden (with something like
return self._C(mu)
) if a mu-dependent C is needed.
"""
if self._C is None: self._C = 1.
return self._C
@property
def isCEye(self):
"""
Whether the action of C can be seen as a scalar multiplication. Should
be overridden (with
return True
) if a mu-dependent scalar C is used.
"""
return isinstance(self._C, Number)
def applyC(self, u:sampList, mu:paramVal):
"""Apply LHS of linear system."""
return dot(self.C(mu), u)
def setSolver(self, solverType:str, solverArgs : DictAny = {}):
"""Choose solver type and parameters."""
self._solver, self._solverArgs = setupSolver(solverType, solverArgs)
def solve(self, mu : paramList = [], RHS : sampList = None,
return_state : bool = False) -> sampList:
"""
Find solution of linear system.
Args:
mu: parameter value.
RHS: RHS of linear system. If None, defaults to that of parametric
system. Defaults to None.
return_state: whether to return state before multiplication by c.
Defaults to False.
"""
from rrompy.sampling import sampleList, emptySampleList
if mu == []: mu = self.mu0
mu = self.checkParameterList(mu)
if len(mu) == 0: return emptySampleList()
mu = self.checkParameterList(mu)
mu_loc, idx, sizes = listScatter(mu, return_sizes = True)
mu_loc = self.checkParameterList(mu_loc)
req, emptyCores = [], np.where(np.logical_not(sizes))[0]
if len(mu_loc) == 0:
uL, uT = recv(source = 0, tag = poolRank())
sol = np.empty((uL, 0), dtype = uT)
else:
if RHS is None: # build RHSs
RHS = sampleList([self.b(m) for m in mu_loc])
else:
RHS = sampleList(RHS)
if len(RHS) > 1: RHS = sampleList([RHS[i] for i in idx])
mult = 0 if len(RHS) == 1 else 1
RROMPyAssert(mult * (len(mu_loc) - 1) + 1, len(RHS), "Sample size")
for j, mj in enumerate(mu_loc):
u = tsolve(self.A(mj), RHS[mult * j], self._solver,
self._solverArgs)
if not return_state: u = self.applyC(u, mj)
if j == 0:
sol = np.empty((len(u), len(mu_loc)), dtype = u.dtype)
if masterCore():
for dest in emptyCores:
req += [isend((len(u), u.dtype), dest = dest,
tag = dest)]
sol[:, j] = u
for r in req: r.wait()
sol = matrixGatherv(sol, sizes)
return sampleList(sol)
def residual(self, mu : paramList = [], u : sampList = None,
post_c : bool = True) -> sampList:
"""
Find residual of linear system for given approximate solution.
Args:
mu: parameter value.
u: numpy complex array with function dofs. If None, set to 0.
post_c: whether to post-process using c. Defaults to True.
"""
from rrompy.sampling import sampleList, emptySampleList
if mu == []: mu = self.mu0
mu = self.checkParameterList(mu)
if len(mu) == 0: return emptySampleList()
mu_loc, idx, sizes = listScatter(mu, return_sizes = True)
mu_loc = self.checkParameterList(mu_loc)
req, emptyCores = [], np.where(np.logical_not(sizes))[0]
if len(mu_loc) == 0:
uL, uT = recv(source = 0, tag = poolRank())
res = np.empty((uL, 0), dtype = uT)
else:
v = sampleList(np.zeros((self.spacedim, len(mu_loc))))
if u is not None:
u = sampleList(u)
v = v + sampleList([u[i] for i in idx])
for j, (mj, vj) in enumerate(zip(mu_loc, v)):
r = self.b(mj) - dot(self.A(mj), vj)
if post_c: r = self.applyC(r, mj)
if j == 0:
res = np.empty((len(r), len(mu_loc)), dtype = r.dtype)
if masterCore():
for dest in emptyCores:
req += [isend((len(r), r.dtype), dest = dest,
tag = dest)]
res[:, j] = r
for r in req: r.wait()
res = matrixGatherv(res, sizes)
return sampleList(res)
cutOffPolesRMax,cutOffPolesRMin = np.inf, - np.inf
cutOffPolesRMaxRel, cutOffPolesRMinRel = np.inf, - np.inf
cutOffPolesIMax, cutOffPolesIMin = np.inf, - np.inf
cutOffPolesIMaxRel, cutOffPolesIMinRel = np.inf, - np.inf
cutOffResNormMin = -1
cutOffResAngleMin, cutOffResAngleMax = -1, np.pi + 1
@property
def _ignoreResidues(self):
return (self.cutOffResNormMin <= 0. and self.cutOffResAngleMin <= 0.
and self.cutOffResAngleMax >= np.pi)
def flagBadPolesResidues(self, poles:Np1D, residues : Np1D = None,
relative : bool = False,
projMat : Np2D = None) -> Np1D:
"""
Flag (numerical) poles/residues which are impossible.
Args:
poles: poles to be judged.
residues: residues norms to be judged.
relative: whether relative values should be used for poles.
projMat: matrix for projection of residues.
"""
poles = np.array(poles).flatten()
flag = np.zeros(len(poles), dtype = bool)
if relative:
RMax, RMin = self.cutOffPolesRMaxRel, self.cutOffPolesRMinRel
IMax, IMin = self.cutOffPolesIMaxRel, self.cutOffPolesIMinRel
else:
RMax, RMin = self.cutOffPolesRMax, self.cutOffPolesRMin
IMax, IMin = self.cutOffPolesIMax, self.cutOffPolesIMin
if not np.isinf(RMax):
flag = np.logical_or(flag, np.real(poles) > RMax)
if not np.isinf(RMin):
flag = np.logical_or(flag, np.real(poles) < RMin)
if not np.isinf(IMax):
flag = np.logical_or(flag, np.imag(poles) > IMax)
if not np.isinf(IMin):
flag = np.logical_or(flag, np.imag(poles) < IMin)
if residues is not None and not self._ignoreResidues:
residues = np.array(residues).reshape(-1, len(poles))
if projMat is None:
resNorm = np.linalg.norm(residues, axis = 0)
else:
residues = projMat.dot(residues)
resNorm = self.norm(residues)
if self.cutOffResNormMin > 0.:
resNormEff = resNorm / np.max(resNorm)
flag = np.logical_or(flag, resNormEff < self.cutOffResNormMin)
if self.cutOffResAngleMin > 0. or self.cutOffResAngleMax < np.pi:
if projMat is None:
angles = np.real(residues.T.conj().dot(residues))
else:
angles = np.real(self.innerProduct(residues, residues))
resNormEff = resNorm
resNormEff[np.isclose(resNormEff, 0., atol = 1e-15)] = 1.
angles = np.clip((angles / resNormEff).T / resNormEff, -1., 1.)
angles = np.arccos(angles)
badangles = np.logical_or(angles < self.cutOffResAngleMin,
angles > self.cutOffResAngleMax)
badangles[np.arange(len(angles)), np.arange(len(angles))] = 0
idx = np.zeros(len(angles), dtype = bool)
while np.sum(badangles) > 0:
idxn = np.argmax(np.sum(badangles, axis = 1))
badangles[idxn], badangles[:, idxn] = 0, 0
idx[idxn] = 1
flag = np.logical_or(flag, idx)
return flag

Event Timeline