Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F78280617
hfengine_base.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Mon, Aug 19, 14:25
Size
15 KB
Mime Type
text/x-python
Expires
Wed, Aug 21, 14:25 (1 d, 23 h)
Engine
blob
Format
Raw Data
Handle
20017960
Attached To
R6746 RationalROMPy
hfengine_base.py
View Options
# Copyright (C) 2018-2020 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see <http://www.gnu.org/licenses/>.
#
from
abc
import
abstractmethod
import
numpy
as
np
import
scipy.sparse
as
scsp
from
numbers
import
Number
from
collections.abc
import
Iterable
from
copy
import
copy
as
softcopy
from
rrompy.utilities.base.decorators
import
(
nonaffine_construct
,
mu_independent
)
from
rrompy.utilities.base.types
import
(
Np1D
,
Np2D
,
Tuple
,
List
,
DictAny
,
paramVal
,
paramList
,
sampList
)
from
rrompy.utilities.numerical
import
solve
as
tsolve
,
dot
,
potential
from
rrompy.utilities.expression
import
expressionEvaluator
from
rrompy.utilities.exception_manager
import
RROMPyAssert
from
rrompy.sampling.sample_list
import
sampleList
from
rrompy.parameter
import
(
checkParameter
,
checkParameterList
,
parameterList
,
parameterMap
as
pMap
)
from
rrompy.solver.linear_solver
import
setupSolver
from
rrompy.utilities.parallel
import
(
poolRank
,
masterCore
,
listScatter
,
matrixGatherv
,
isend
,
recv
)
__all__
=
[
'HFEngineBase'
]
class
HFEngineBase
:
"""Generic solver for parametric problems."""
def
__init__
(
self
,
verbosity
:
int
=
10
,
timestamp
:
bool
=
True
):
self
.
verbosity
=
verbosity
self
.
timestamp
=
timestamp
self
.
setSolver
(
"SPSOLVE"
,
{
"use_umfpack"
:
False
})
self
.
npar
=
0
self
.
_C
=
1.
def
name
(
self
)
->
str
:
return
self
.
__class__
.
__name__
def
__str__
(
self
)
->
str
:
return
self
.
name
()
def
__repr__
(
self
)
->
str
:
return
self
.
__str__
()
+
" at "
+
hex
(
id
(
self
))
def
__dir_base__
(
self
):
return
[
x
for
x
in
self
.
__dir__
()
if
x
[:
2
]
!=
"__"
]
def
__deepcopy__
(
self
,
memo
):
return
softcopy
(
self
)
@property
def
npar
(
self
):
"""Value of npar."""
return
self
.
_npar
@npar.setter
def
npar
(
self
,
npar
):
nparOld
=
self
.
_npar
if
hasattr
(
self
,
"_npar"
)
else
-
1
if
npar
!=
nparOld
:
self
.
parameterMap
=
pMap
(
1.
,
npar
)
self
.
_npar
=
npar
@property
def
spacedim
(
self
):
return
1
@property
def
outputdim
(
self
):
if
self
.
isCEye
:
return
self
.
spacedim
return
self
.
_C
.
shape
[
0
]
def
checkParameter
(
self
,
mu
:
paramVal
)
->
paramVal
:
muP
=
checkParameter
(
mu
,
self
.
npar
)
if
self
.
npar
==
0
:
muP
.
reset
((
1
,
0
),
np
.
complex
)
return
muP
def
checkParameterList
(
self
,
mu
:
paramList
,
check_if_single
:
bool
=
False
)
->
paramList
:
muL
=
checkParameterList
(
mu
,
self
.
npar
,
check_if_single
)
return
muL
def
mapParameterList
(
self
,
mu
:
paramList
,
direct
:
str
=
"F"
,
idx
:
List
[
int
]
=
None
)
->
paramList
:
if
idx
is
None
:
idx
=
np
.
arange
(
self
.
npar
)
muMapped
=
checkParameterList
(
mu
,
len
(
idx
))
for
j
,
d
in
enumerate
(
idx
):
muMapped
.
data
[:,
j
]
=
expressionEvaluator
(
self
.
parameterMap
[
direct
][
d
],
muMapped
(
j
))
.
flatten
()
return
muMapped
@property
def
energyNormMatrix
(
self
):
if
not
hasattr
(
self
,
"_energyNormMatrix"
):
self
.
buildEnergyNormForm
()
return
self
.
_energyNormMatrix
def
buildEnergyNormForm
(
self
):
"""
Build sparse matrix (in CSR format) representative of scalar product.
"""
self
.
_energyNormMatrix
=
1.
@property
def
energyNormDualMatrix
(
self
):
if
not
hasattr
(
self
,
"_energyNormDualMatrix"
):
self
.
buildEnergyNormDualForm
()
return
self
.
_energyNormDualMatrix
def
buildEnergyNormDualForm
(
self
):
"""
Build sparse matrix (in CSR format) representative of dual scalar
product without duality.
"""
self
.
_energyNormDualMatrix
=
1.
@property
def
energyNormOutputMatrix
(
self
):
if
not
hasattr
(
self
,
"_energyNormOutputMatrix"
):
self
.
buildEnergyNormOutput
()
return
self
.
_energyNormOutputMatrix
def
buildEnergyNormOutput
(
self
):
"""
Build sparse matrix (in CSR format) representative of scalar product
over output space.
"""
self
.
_energyNormOutputMatrix
=
1.
def
innerProduct
(
self
,
u
:
Np2D
,
v
:
Np2D
,
onlyDiag
:
bool
=
False
,
dual
:
bool
=
False
,
is_state
:
bool
=
True
)
->
Np2D
:
"""Scalar product."""
if
is_state
or
self
.
isCEye
:
if
dual
:
energyMat
=
self
.
energyNormDualMatrix
else
:
energyMat
=
self
.
energyNormMatrix
else
:
energyMat
=
self
.
energyNormOutputMatrix
if
isinstance
(
u
,
(
parameterList
,
sampleList
)):
u
=
u
.
data
if
isinstance
(
v
,
(
parameterList
,
sampleList
)):
v
=
v
.
data
if
onlyDiag
:
return
np
.
sum
(
dot
(
energyMat
,
u
)
*
v
.
conj
(),
axis
=
0
)
return
dot
(
dot
(
energyMat
,
u
)
.
T
,
v
.
conj
())
.
T
def
norm
(
self
,
u
:
Np2D
,
dual
:
bool
=
False
,
is_state
:
bool
=
True
)
->
Np1D
:
return
np
.
abs
(
self
.
innerProduct
(
u
,
u
,
onlyDiag
=
True
,
dual
=
dual
,
is_state
=
is_state
))
**
.
5
def
baselineA
(
self
):
"""Return 0 of shape consistent with operator of linear system."""
if
(
hasattr
(
self
,
"As"
)
and
isinstance
(
self
.
As
,
Iterable
)
and
self
.
As
[
0
]
is
not
None
):
d
=
self
.
As
[
0
]
.
shape
[
0
]
else
:
d
=
self
.
spacedim
return
scsp
.
csr_matrix
((
np
.
zeros
(
0
),
np
.
zeros
(
0
),
np
.
zeros
(
d
+
1
)),
shape
=
(
d
,
d
),
dtype
=
np
.
complex
)
def
baselineb
(
self
):
"""Return 0 of shape consistent with RHS of linear system."""
return
np
.
zeros
(
self
.
spacedim
,
dtype
=
np
.
complex
)
@nonaffine_construct
@abstractmethod
def
A
(
self
,
mu
:
paramVal
=
[],
der
:
List
[
int
]
=
0
)
->
Np2D
:
"""
Assemble terms of operator of linear system and return it (or its
derivative) at a given parameter.
"""
return
@nonaffine_construct
@abstractmethod
def
b
(
self
,
mu
:
paramVal
=
[],
der
:
List
[
int
]
=
0
)
->
Np1D
:
"""
Assemble terms of RHS of linear system and return it (or its
derivative) at a given parameter.
"""
return
@mu_independent
def
C
(
self
,
mu
:
paramVal
):
"""
Value of C. Should be overridden (with something like
return self._C(mu)
) if a mu-dependent C is needed.
"""
return
self
.
_C
@property
def
isCEye
(
self
):
"""
Whether the action of C can be seen as a scalar multiplication. Should
be overridden (with
return True
) if a mu-dependent scalar C is used.
"""
return
isinstance
(
self
.
_C
,
Number
)
def
applyC
(
self
,
u
:
sampList
,
mu
:
paramVal
):
"""Apply LHS of linear system."""
return
dot
(
self
.
C
(
mu
),
u
)
def
setSolver
(
self
,
solverType
:
str
,
solverArgs
:
DictAny
=
{}):
"""Choose solver type and parameters."""
self
.
_solver
,
self
.
_solverArgs
=
setupSolver
(
solverType
,
solverArgs
)
def
solve
(
self
,
mu
:
paramList
=
[],
RHS
:
sampList
=
None
,
return_state
:
bool
=
False
)
->
sampList
:
"""
Find solution of linear system.
Args:
mu: parameter value.
RHS: RHS of linear system. If None, defaults to that of parametric
system. Defaults to None.
return_state: whether to return state before multiplication by c.
Defaults to False.
"""
from
rrompy.sampling
import
sampleList
,
emptySampleList
if
mu
==
[]:
mu
=
self
.
mu0
mu
=
self
.
checkParameterList
(
mu
)
if
len
(
mu
)
==
0
:
return
emptySampleList
()
req
,
is_master
=
[],
masterCore
()
mu_loc
,
idx
,
sizes
=
listScatter
(
mu
,
return_sizes
=
True
)
mu_loc
=
self
.
checkParameterList
(
mu_loc
)
emptyCores
=
np
.
where
(
np
.
array
(
sizes
)
==
0
)[
0
]
if
is_master
else
[]
if
len
(
mu_loc
)
==
0
:
uL
=
recv
(
source
=
0
,
tag
=
poolRank
())
sol
=
np
.
empty
((
uL
,
0
),
dtype
=
np
.
complex
)
else
:
if
RHS
is
None
:
# build RHSs
RHS
=
sampleList
([
self
.
b
(
m
)
for
m
in
mu_loc
])
else
:
RHS
=
sampleList
(
RHS
)
if
len
(
RHS
)
>
1
:
RHS
=
sampleList
([
RHS
[
i
]
for
i
in
idx
])
mult
=
0
if
len
(
RHS
)
==
1
else
1
RROMPyAssert
(
mult
*
(
len
(
mu_loc
)
-
1
)
+
1
,
len
(
RHS
),
"Sample size"
)
for
j
,
mj
in
enumerate
(
mu_loc
):
u
=
tsolve
(
self
.
A
(
mj
),
RHS
[
mult
*
j
],
self
.
_solver
,
self
.
_solverArgs
)
if
not
return_state
:
u
=
self
.
applyC
(
u
,
mj
)
if
j
==
0
:
sol
=
np
.
empty
((
len
(
u
),
len
(
mu_loc
)),
dtype
=
np
.
complex
)
for
dest
in
emptyCores
:
req
+=
[
isend
(
len
(
u
),
dest
=
dest
,
tag
=
dest
)]
sol
[:,
j
]
=
u
for
r
in
req
:
r
.
wait
()
sol
=
matrixGatherv
(
sol
,
sizes
)
return
sampleList
(
sol
)
def
residual
(
self
,
mu
:
paramList
=
[],
u
:
sampList
=
None
,
post_c
:
bool
=
True
)
->
sampList
:
"""
Find residual of linear system for given approximate solution.
Args:
mu: parameter value.
u: numpy complex array with function dofs. If None, set to 0.
post_c: whether to post-process using c. Defaults to True.
"""
from
rrompy.sampling
import
sampleList
,
emptySampleList
if
mu
==
[]:
mu
=
self
.
mu0
mu
=
self
.
checkParameterList
(
mu
)
if
len
(
mu
)
==
0
:
return
emptySampleList
()
req
,
is_master
=
[],
masterCore
()
mu_loc
,
idx
,
sizes
=
listScatter
(
mu
,
return_sizes
=
True
)
mu_loc
=
self
.
checkParameterList
(
mu_loc
)
emptyCores
=
np
.
where
(
np
.
array
(
sizes
)
==
0
)[
0
]
if
is_master
else
[]
if
len
(
mu_loc
)
==
0
:
uL
=
recv
(
source
=
0
,
tag
=
poolRank
())
res
=
np
.
empty
((
uL
,
0
),
dtype
=
np
.
complex
)
else
:
v
=
sampleList
(
np
.
zeros
((
self
.
spacedim
,
len
(
mu_loc
))))
if
u
is
not
None
:
u
=
sampleList
(
u
)
v
=
v
+
sampleList
([
u
[
i
]
for
i
in
idx
])
for
j
,
(
mj
,
vj
)
in
enumerate
(
zip
(
mu_loc
,
v
)):
r
=
self
.
b
(
mj
)
-
dot
(
self
.
A
(
mj
),
vj
)
if
post_c
:
r
=
self
.
applyC
(
r
,
mj
)
if
j
==
0
:
res
=
np
.
empty
((
len
(
r
),
len
(
mu_loc
)),
dtype
=
np
.
complex
)
for
dest
in
emptyCores
:
req
+=
[
isend
(
len
(
r
),
dest
=
dest
,
tag
=
dest
)]
res
[:,
j
]
=
r
for
r
in
req
:
r
.
wait
()
res
=
matrixGatherv
(
res
,
sizes
)
return
sampleList
(
res
)
cutOffPolesRMax
,
cutOffPolesRMin
=
np
.
inf
,
-
np
.
inf
cutOffPolesIMax
,
cutOffPolesIMin
=
np
.
inf
,
-
np
.
inf
def
flagBadPolesResiduesAbsolute
(
self
,
poles
:
Np1D
,
residues
:
Np1D
=
None
,
projMat
:
Np2D
=
None
)
->
Np1D
:
"""
Flag (numerical) poles/residues which are impossible.
Args:
poles: poles to be judged.
residues: residues norms to be judged.
projMat: matrix for projection of residues.
"""
poles
=
np
.
array
(
poles
)
.
flatten
()
flag
=
np
.
zeros
(
len
(
poles
),
dtype
=
bool
)
RMax
,
RMin
=
self
.
cutOffPolesRMax
,
self
.
cutOffPolesRMin
IMax
,
IMin
=
self
.
cutOffPolesIMax
,
self
.
cutOffPolesIMin
if
not
np
.
isinf
(
RMax
):
flag
=
flag
+
(
np
.
real
(
poles
)
>
RMax
)
if
not
np
.
isinf
(
RMin
):
flag
=
flag
+
(
np
.
real
(
poles
)
<
RMin
)
if
not
np
.
isinf
(
IMax
):
flag
=
flag
+
(
np
.
imag
(
poles
)
>
IMax
)
if
not
np
.
isinf
(
IMin
):
flag
=
flag
+
(
np
.
imag
(
poles
)
<
IMin
)
return
flag
cutOffPolesPotentialMax
=
np
.
inf
cutOffPolesRMaxRel
,
cutOffPolesRMinRel
=
np
.
inf
,
-
np
.
inf
cutOffPolesIMaxRel
,
cutOffPolesIMinRel
=
np
.
inf
,
-
np
.
inf
cutOffResNormMin
=
-
1
cutOffResAngleMin
,
cutOffResAngleMax
=
-
1
,
np
.
pi
+
1
def
flagBadPolesResiduesRelative
(
self
,
poles
:
Np1D
,
residues
:
Np1D
=
None
,
projMat
:
Np2D
=
None
,
foci
:
Tuple
[
float
,
float
]
=
[
-
1.
,
1.
])
\
->
Np1D
:
"""
Flag (numerical) poles/residues which are impossible.
Args:
poles: poles to be judged.
residues: residues norms to be judged.
projMat: matrix for projection of residues.
foci: foci for potential evaluation.
"""
poles
=
np
.
array
(
poles
)
.
flatten
()
flag
=
np
.
zeros
(
len
(
poles
),
dtype
=
bool
)
potMax
=
self
.
cutOffPolesPotentialMax
RMax
,
RMin
=
self
.
cutOffPolesRMaxRel
,
self
.
cutOffPolesRMinRel
IMax
,
IMin
=
self
.
cutOffPolesIMaxRel
,
self
.
cutOffPolesIMinRel
if
not
np
.
isinf
(
potMax
)
or
(
residues
is
not
None
and
not
self
.
_ignoreResidues
):
plsInf
=
np
.
isinf
(
poles
)
pot
=
potential
(
poles
,
foci
)
if
not
np
.
isinf
(
potMax
):
flag
=
flag
+
(
pot
>
potMax
)
if
not
np
.
isinf
(
RMax
):
flag
=
flag
+
(
np
.
real
(
poles
)
>
RMax
)
if
not
np
.
isinf
(
RMin
):
flag
=
flag
+
(
np
.
real
(
poles
)
<
RMin
)
if
not
np
.
isinf
(
IMax
):
flag
=
flag
+
(
np
.
imag
(
poles
)
>
IMax
)
if
not
np
.
isinf
(
IMin
):
flag
=
flag
+
(
np
.
imag
(
poles
)
<
IMin
)
if
residues
is
not
None
and
not
self
.
_ignoreResidues
:
residues
=
np
.
array
(
residues
)
.
reshape
(
-
1
,
len
(
poles
))
resGood
=
np
.
where
(
flag
+
plsInf
==
False
)[
0
]
if
len
(
resGood
)
>
0
:
residues
=
residues
[:,
resGood
]
/
pot
[
resGood
]
if
projMat
is
None
:
resNorm
=
np
.
linalg
.
norm
(
residues
,
axis
=
0
)
else
:
residues
=
projMat
.
dot
(
residues
)
resNorm
=
self
.
norm
(
residues
)
if
self
.
cutOffResNormMin
>
0.
:
flag
[
resGood
[
resNorm
<
self
.
cutOffResNormMin
*
np
.
max
(
resNorm
)]]
=
1
resGood
=
np
.
where
(
flag
+
plsInf
==
False
)[
0
]
if
len
(
resGood
)
>
0
and
(
self
.
cutOffResAngleMin
>
0.
or
self
.
cutOffResAngleMax
<
np
.
pi
):
if
projMat
is
None
:
angles
=
np
.
real
(
residues
.
T
.
conj
()
.
dot
(
residues
))
else
:
angles
=
np
.
real
(
self
.
innerProduct
(
residues
,
residues
))
resNormEff
=
resNorm
resNormEff
[
np
.
isclose
(
resNormEff
,
0.
,
atol
=
1e-15
)]
=
1.
angles
=
np
.
clip
((
angles
/
resNormEff
)
.
T
/
resNormEff
,
-
1.
,
1.
)
angles
=
np
.
arccos
(
angles
)
badangles
=
((
angles
<
self
.
cutOffResAngleMin
)
+
(
angles
>
self
.
cutOffResAngleMax
))
badangles
[
np
.
arange
(
len
(
angles
)),
np
.
arange
(
len
(
angles
))]
=
0
idx
=
np
.
zeros
(
len
(
angles
),
dtype
=
bool
)
while
np
.
sum
(
badangles
)
>
0
:
idxn
=
np
.
argmax
(
np
.
sum
(
badangles
,
axis
=
1
))
badangles
[
idxn
],
badangles
[:,
idxn
]
=
0
,
0
idx
[
idxn
]
=
True
flag
[
resGood
[
idx
]]
=
1
return
flag
>
0
@property
def
_ignoreResidues
(
self
):
return
(
self
.
cutOffResNormMin
<=
0.
and
self
.
cutOffResAngleMin
<=
0.
and
self
.
cutOffResAngleMax
>=
np
.
pi
)
Event Timeline
Log In to Comment