Page MenuHomec4science

marginal_proxy_engine.py
No OneTemporary

File Metadata

Created
Thu, Jun 6, 06:25

marginal_proxy_engine.py

# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see <http://www.gnu.org/licenses/>.
#
import numpy as np
import scipy.sparse as scsp
from copy import deepcopy as copy, copy as softcopy
from rrompy.utilities.base.types import (Np1D, Np2D, ScOp, Tuple, List,
paramVal, paramList, HFEng, sampList)
from rrompy.utilities.base import multibinom
from rrompy.utilities.poly_fitting.polynomial import (
hashDerivativeToIdx as hashD, hashIdxToDerivative as hashI)
from rrompy.utilities.exception_manager import RROMPyAssert
from rrompy.parameter import checkParameter, checkParameterList
from rrompy.sampling import sampleList, emptySampleList
from rrompy.solver.scipy import tensorizeLS, detensorizeLS
__all__ = ['MarginalProxyEngine']
class MarginalProxyEngine:
"""
Marginalized should prescribe fixed value for the marginalized parameters
and leave None elsewhere.
"""
def __init__(self, HFEngine:HFEng, marginalized:Np1D):
self.baseHF = HFEngine
self.marg = marginalized
self._setupAs()
self._setupbs()
@property
def freeLocations(self):
return tuple([x for x in range(self.nparBase) if self.marg[x] is None])
@property
def fixedLocations(self):
return tuple([x for x in range(self.nparBase) \
if self.marg[x] is not None])
@property
def muFixed(self):
muF = checkParameter([self.marg[x] for x in self.fixedLocations])
if self.nparBase - self.npar > 0: muF = muF[0]
return muF
@property
def rescalingExpFree(self):
return [self.baseHF.rescalingExp[x] for x in self.freeLocations]
@property
def rescalingExpFixed(self):
return [self.baseHF.rescalingExp[x] for x in self.fixedLocations]
@property
def V(self):
return self.baseHF.V
def name(self) -> str:
return "{}-proxy for {}".format(self.freeLocations, self.baseHF.name())
def __str__(self) -> str:
return self.name()
def __repr__(self) -> str:
return self.__str__() + " at " + hex(id(self))
def __dir_base__(self):
return [x for x in self.__dir__() if x[:2] != "__"]
def __deepcopy__(self, memo):
return softcopy(self)
@property
def npar(self):
"""Value of npar."""
return len(self.freeLocations)
@property
def nparBase(self):
"""Value of npar."""
return self.baseHF.npar
@property
def nAs(self):
"""Value of nAs."""
return self._nAs
@property
def nbs(self):
"""Value of nbs."""
return self._nbs
@property
def nbsH(self) -> int:
return max(self.nbs, self.nAs)
def spacedim(self):
return self.As[0].shape[1]
def checkParameter(self, mu:paramList):
return checkParameter(mu, self.npar)
def checkParameterList(self, mu:paramList):
return checkParameterList(mu, self.npar)
def buildEnergyNormForm(self):
self.baseHF.buildEnergyNormForm()
self.energyNormMatrix = self.baseHF.energyNormMatrix
def buildEnergyNormDualForm(self):
self.baseHF.buildEnergyNormDualForm()
self.energyNormDualMatrix = self.baseHF.energyNormDualMatrix
def buildDualityPairingForm(self):
self.baseHF.buildDualityPairingForm()
self.dualityMatrix = self.baseHF.dualityMatrix
def buildEnergyNormPartialDualForm(self):
self.baseHF.buildEnergyNormPartialDualForm()
self.energyNormPartialDualMatrix = (
self.baseHF.energyNormPartialDualMatrix)
def innerProduct(self, u:Np2D, v:Np2D, onlyDiag : bool = False,
dual : bool = False, duality : bool = True) -> Np2D:
return self.baseHF.innerProduct(u, v, onlyDiag, dual, duality)
def norm(self, u:Np2D, dual : bool = False, duality : bool = True) -> Np1D:
return self.baseHF.norm(u, dual, duality)
def checkAInBounds(self, derI : int = 0):
"""Check if derivative index is oob for operator of linear system."""
if derI < 0 or derI >= self.nAs:
d = self.spacedim()
return scsp.csr_matrix((np.zeros(0), np.zeros(0), np.zeros(d + 1)),
shape = (d, d), dtype = np.complex)
def checkbInBounds(self, derI : int = 0, homogeneized : bool = False):
"""Check if derivative index is oob for RHS of linear system."""
nbs = self.nbsH if homogeneized else self.nbs
if derI < 0 or derI >= nbs:
return np.zeros(self.spacedim(), dtype = np.complex)
def _assembleA(self, mu : paramVal = [], der : List[int] = 0,
derI : int = None) -> ScOp:
"""Assemble (derivative of) operator of linear system."""
mu = self.checkParameter(mu)
if self.npar > 0: mu = mu[0]
if not hasattr(der, "__len__"): der = [der] * self.npar
if derI is None: derI = hashD(der)
Anull = self.checkAInBounds(derI)
if Anull is not None: return Anull
rExp = self.rescalingExpFree
A = copy(self.As[derI])
for j in range(derI + 1, self.nAs):
derIdx = hashI(j, self.npar)
diffIdx = [x - y for (x, y) in zip(derIdx, der)]
if np.all([x >= 0 for x in diffIdx]):
A = A + (np.prod((mu ** rExp) ** diffIdx)
* multibinom(derIdx, diffIdx) * self.As[j])
return A
def A(self, mu : paramVal = [], der : List[int] = 0) -> ScOp:
"""
Assemble terms of operator of linear system and return it (or its
derivative) at a given parameter.
"""
mu = self.checkParameter(mu)
if not hasattr(der, "__len__"): der = [der] * self.npar
derI = hashD(der)
return self._assembleA(mu, der, derI)
def _setupAs(self):
self.As = []
self._nAs = 0
for j in range(self.baseHF.nAs):
derjBase = hashI(j, self.nparBase)
jNew = hashD([derjBase[i] for i in self.freeLocations])
derjFixed = [derjBase[i] for i in self.fixedLocations]
A = (np.prod((self.muFixed ** self.rescalingExpFixed) ** derjFixed)
* self.baseHF.As[j])
if jNew >= self._nAs:
for _ in range(self._nAs, jNew):
self.As += [self.checkAInBounds(-1)]
self.As += [A]
self._nAs = jNew + 1
else:
self.As[jNew] = self.As[jNew] + A
def affineLinearSystemA(self, mu : paramVal = []) -> List[Np2D]:
"""
Assemble affine blocks of operator of linear system (just linear
blocks).
"""
As = [None] * self.nAs
for j in range(self.nAs):
As[j] = self.A(mu, hashI(j, self.npar))
return As
def affineWeightsA(self, mu : paramVal = []) -> List[str]:
"""
Assemble affine blocks of operator of linear system (just affine
weights). Stored as strings for the sake of pickling.
"""
mu = self.checkParameter(mu)
lambdasA = ["1."]
mu0Eff = mu ** self.rescalingExpFree
for j in range(1, self.nAs):
lambdasA += ["np.prod((mu ** ({1}) - [{0}]) ** ({2}))".format(
','.join([str(x) for x in mu0Eff[0]]),
self.rescalingExpFree,
hashI(j, self.npar))]
return lambdasA
def affineBlocksA(self, mu : paramVal = [])\
-> Tuple[List[Np2D], List[str]]:
"""Assemble affine blocks of operator of linear system."""
return self.affineLinearSystemA(mu), self.affineWeightsA(mu)
def _assembleb(self, mu : paramVal = [], der : List[int] = 0,
derI : int = None, homogeneized : bool = False) -> ScOp:
"""Assemble (derivative of) (homogeneized) RHS of linear system."""
mu = self.checkParameter(mu)
if self.npar > 0: mu = mu[0]
if not hasattr(der, "__len__"): der = [der] * self.npar
if derI is None: derI = hashD(der)
bnull = self.checkbInBounds(derI, homogeneized)
if bnull is not None: return bnull
bs = self.bsH if homogeneized else self.bs
rExp = self.rescalingExpFree
b = copy(bs[derI])
for j in range(derI + 1, len(bs)):
derIdx = hashI(j, self.npar)
diffIdx = [x - y for (x, y) in zip(derIdx, der)]
if np.all([x >= 0 for x in diffIdx]):
b = b + (np.prod((mu ** rExp) ** diffIdx)
* multibinom(derIdx, diffIdx) * bs[j])
return b
def b(self, mu : paramVal = [], der : List[int] = 0,
homogeneized : bool = False) -> Np1D:
"""
Assemble terms of (homogeneized) RHS of linear system and return it (or
its derivative) at a given parameter.
"""
mu = self.checkParameter(mu)
if not hasattr(der, "__len__"): der = [der] * self.npar
derI = hashD(der)
return self._assembleb(mu, der, derI, homogeneized)
def _setupbs(self):
self.bs = []
self._nbs = 0
for j in range(self.baseHF.nbs):
derjBase = hashI(j, self.nparBase)
jNew = hashD([derjBase[i] for i in self.freeLocations])
derjFixed = [derjBase[i] for i in self.fixedLocations]
b = (np.prod((self.muFixed ** self.rescalingExpFixed) ** derjFixed)
* self.baseHF.bs[j])
if jNew >= self._nbs:
for _ in range(self._nbs, jNew):
self.bs += [self.checkbInBounds(-1)]
self.bs += [b]
self._nbs = jNew + 1
else:
self.bs[jNew] = self.bs[jNew] + b
def affineLinearSystemb(self, mu : paramVal = [],
homogeneized : bool = False) -> List[Np1D]:
"""
Assemble affine blocks of RHS of linear system (just linear blocks).
"""
nbs = self.nbsH if homogeneized else self.nbs
bs = [None] * nbs
for j in range(nbs):
bs[j] = self.b(mu, hashI(j, self.npar), homogeneized)
return bs
def affineWeightsb(self, mu : paramVal = [],
homogeneized : bool = False) -> List[str]:
"""
Assemble affine blocks of RHS of linear system (just affine weights).
Stored as strings for the sake of pickling.
"""
mu = self.checkParameter(mu)
nbs = self.nbsH if homogeneized else self.nbs
lambdasb = ["1."]
mu0Eff = mu ** self.rescalingExpFree
for j in range(1, nbs):
lambdasb += ["np.prod((mu ** ({1}) - [{0}]) ** ({2}))".format(
','.join([str(x) for x in mu0Eff[0]]),
self.rescalingExpFree,
hashI(j, self.npar))]
return lambdasb
def affineBlocksb(self, mu : paramVal = [], homogeneized : bool = False)\
-> Tuple[List[Np1D], List[str]]:
"""Assemble affine blocks of RHS of linear system."""
return (self.affineLinearSystemb(mu, homogeneized),
self.affineWeightsb(mu, homogeneized))
def solve(self, mu : paramList = [], RHS : sampList = None,
homogeneized : bool = False) -> sampList:
"""
Find solution of linear system.
Args:
mu: parameter value.
RHS: RHS of linear system. If None, defaults to that of parametric
system. Defaults to None.
"""
mu, _ = self.checkParameterList(mu)
if mu.shape[0] == 0: mu.reset((1, self.npar), mu.dtype)
sol = emptySampleList()
if len(mu) > 0:
if RHS is None:
RHS = [self.b(m, homogeneized = homogeneized) for m in mu]
RHS = sampleList(RHS)
mult = 0 if len(RHS) == 1 else 1
RROMPyAssert(mult * (len(mu) - 1) + 1, len(RHS), "Sample size")
u = self.baseHF._solver(self.A(mu[0]), RHS[0],
self.baseHF._solverArgs)
sol.reset((len(u), len(mu)), dtype = u.dtype)
sol[0] = u
for j in range(1, len(mu), self.baseHF._solveBatchSize):
if self.baseHF._solveBatchSize != 1:
iRange = list(range(j, min(j + self.baseHF._solveBatchSize,
len(mu))))
As = [self.A(mu[i]) for i in iRange]
bs = [RHS[mult * i] for i in iRange]
A, b = tensorizeLS(As, bs)
else:
A, b = self.A(mu[j]), RHS[mult * j]
solStack = self.baseHF._solver(A, b, self.baseHF._solverArgs)
if self.baseHF._solveBatchSize != 1:
sol[iRange] = detensorizeLS(solStack, len(iRange))
else:
sol[j] = solStack
return sol
def residual(self, u:sampList, mu : paramList = [],
homogeneized : bool = False,
duality : bool = True) -> sampList:
"""
Find residual of linear system for given approximate solution.
Args:
u: numpy complex array with function dofs. If None, set to 0.
mu: parameter value.
"""
mu, _ = self.checkParameterList(mu)
if mu.shape[0] == 0: mu.reset((1, self.npar), mu.dtype)
if u is not None:
u = sampleList(u)
mult = 0 if len(u) == 1 else 1
RROMPyAssert(mult * (len(mu) - 1) + 1, len(u), "Sample size")
res = emptySampleList()
if duality and not hasattr(self, "dualityMatrix"):
self.buildDualityPairingForm()
for j in range(len(mu)):
b = self.b(mu[j], homogeneized = homogeneized)
if u is None:
r = b
else:
r = b - self.A(mu[j]).dot(u[mult * j])
if j == 0:
res.reset((len(r), len(mu)), dtype = r.dtype)
if duality:
r = self.dualityMatrix.dot(r)
res[j] = r
return res
def _rayleighQuotient(self, *args, **kwargs) -> float:
return self.baseHF._rayleighQuotient(*args, **kwargs)
def stabilityFactor(self, u:sampList, mu : paramList = [],
nIterP : int = 10, nIterR : int = 10) -> sampList:
"""
Find stability factor of matrix of linear system using iterative
inverse power iteration- and Rayleigh quotient-based procedure.
Args:
u: numpy complex arrays with function dofs.
mu: parameter values.
nIterP: number of iterations of power method.
nIterR: number of iterations of Rayleigh quotient method.
"""
mu, _ = self.checkParameterList(mu)
if mu.shape[0] == 0: mu.reset((1, self.npar), mu.dtype)
u = sampleList(u)
mult = 0 if len(u) == 1 else 1
RROMPyAssert(mult * (len(mu) - 1) + 1, len(u), "Sample size")
stabFact = np.empty(len(mu), dtype = float)
if not hasattr(self, "energyNormMatrix"):
self.buildEnergyNormForm()
for j in range(len(mu)):
stabFact[j] = self._rayleighQuotient(self.A(mu[j]), u[mult * j],
self.energyNormMatrix,
0., nIterP, nIterR)
return stabFact
def plot(self, *args, **kwargs):
"""
Do some nice plots of the complex-valued function with given dofs.
"""
self.baseHF.plot(*args, **kwargs)
def plotmesh(self, *args, **kwargs):
"""
Do a nice plot of the mesh.
"""
self.baseHF.plotmesh(*args, **kwargs)
def outParaview(self, *args, **kwargs):
"""
Output complex-valued function with given dofs to ParaView file.
"""
self.baseHF.outParaview(*args, **kwargs)
def outParaviewTimeDomain(self, *args, **kwargs):
"""
Output complex-valued function with given dofs to ParaView file,
converted to time domain.
"""
self.baseHF.outParaviewTimeDomain(*args, **kwargs)

Event Timeline