Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F60721182
hfengine_base.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Thu, May 2, 04:47
Size
12 KB
Mime Type
text/x-python
Expires
Sat, May 4, 04:47 (2 d)
Engine
blob
Format
Raw Data
Handle
17402346
Attached To
R6746 RationalROMPy
hfengine_base.py
View Options
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see <http://www.gnu.org/licenses/>.
#
from
abc
import
abstractmethod
import
numpy
as
np
import
scipy.sparse
as
scsp
from
numbers
import
Number
from
collections.abc
import
Iterable
from
copy
import
copy
as
softcopy
from
rrompy.utilities.base.decorators
import
nonaffine_construct
from
rrompy.utilities.base.types
import
(
Np1D
,
Np2D
,
List
,
DictAny
,
paramVal
,
paramList
,
sampList
)
from
rrompy.utilities.numerical
import
solve
as
tsolve
,
dot
,
pseudoInverse
from
rrompy.utilities.expression
import
expressionEvaluator
from
rrompy.utilities.exception_manager
import
RROMPyAssert
from
rrompy.sampling.sample_list
import
sampleList
from
rrompy.parameter
import
(
checkParameter
,
checkParameterList
,
parameterList
,
parameterMap
as
pMap
)
from
rrompy.solver.linear_solver
import
setupSolver
from
rrompy.utilities.parallel
import
(
poolRank
,
masterCore
,
listScatter
,
matrixGatherv
,
isend
,
recv
)
__all__
=
[
'HFEngineBase'
]
class
HFEngineBase
:
"""Generic solver for parametric problems."""
def
__init__
(
self
,
verbosity
:
int
=
10
,
timestamp
:
bool
=
True
):
self
.
verbosity
=
verbosity
self
.
timestamp
=
timestamp
self
.
setSolver
(
"SPSOLVE"
,
{
"use_umfpack"
:
False
})
self
.
npar
=
0
self
.
_C
=
None
self
.
outputNormMatrix
=
1.
def
name
(
self
)
->
str
:
return
self
.
__class__
.
__name__
def
__str__
(
self
)
->
str
:
return
self
.
name
()
def
__repr__
(
self
)
->
str
:
return
self
.
__str__
()
+
" at "
+
hex
(
id
(
self
))
def
__dir_base__
(
self
):
return
[
x
for
x
in
self
.
__dir__
()
if
x
[:
2
]
!=
"__"
]
def
__deepcopy__
(
self
,
memo
):
return
softcopy
(
self
)
@property
def
npar
(
self
):
"""Value of npar."""
return
self
.
_npar
@npar.setter
def
npar
(
self
,
npar
):
nparOld
=
self
.
_npar
if
hasattr
(
self
,
"_npar"
)
else
-
1
if
npar
!=
nparOld
:
self
.
parameterMap
=
pMap
(
1.
,
npar
)
self
.
_npar
=
npar
@property
def
spacedim
(
self
):
return
1
def
checkParameter
(
self
,
mu
:
paramVal
)
->
paramVal
:
muP
=
checkParameter
(
mu
,
self
.
npar
)
if
self
.
npar
==
0
:
muP
.
reset
((
1
,
0
),
muP
.
dtype
)
return
muP
def
checkParameterList
(
self
,
mu
:
paramList
,
check_if_single
:
bool
=
False
)
->
paramList
:
muL
=
checkParameterList
(
mu
,
self
.
npar
,
check_if_single
)
return
muL
def
mapParameterList
(
self
,
mu
:
paramList
,
direct
:
str
=
"F"
,
idx
:
List
[
int
]
=
None
)
->
paramList
:
if
idx
is
None
:
idx
=
np
.
arange
(
self
.
npar
)
muMapped
=
checkParameterList
(
mu
,
len
(
idx
))
for
j
,
d
in
enumerate
(
idx
):
muMapped
.
data
[:,
j
]
=
expressionEvaluator
(
self
.
parameterMap
[
direct
][
d
],
muMapped
(
j
))
.
flatten
()
return
muMapped
def
buildEnergyNormForm
(
self
):
"""
Build sparse matrix (in CSR format) representative of scalar product.
"""
self
.
energyNormMatrix
=
1.
def
buildEnergyNormDualForm
(
self
):
"""
Build sparse matrix (in CSR format) representative of dual scalar
product without duality.
"""
self
.
energyNormDualMatrix
=
1.
def
innerProduct
(
self
,
u
:
Np2D
,
v
:
Np2D
,
onlyDiag
:
bool
=
False
,
dual
:
bool
=
False
,
is_state
:
bool
=
True
)
->
Np2D
:
"""Scalar product."""
if
is_state
or
self
.
isCEye
:
if
dual
:
if
not
hasattr
(
self
,
"energyNormDualMatrix"
):
self
.
buildEnergyNormDualForm
()
energyMat
=
self
.
energyNormDualMatrix
else
:
if
not
hasattr
(
self
,
"energyNormMatrix"
):
self
.
buildEnergyNormForm
()
energyMat
=
self
.
energyNormMatrix
else
:
energyMat
=
self
.
outputNormMatrix
if
isinstance
(
u
,
(
parameterList
,
sampleList
)):
u
=
u
.
data
if
isinstance
(
v
,
(
parameterList
,
sampleList
)):
v
=
v
.
data
if
onlyDiag
:
return
np
.
sum
(
dot
(
energyMat
,
u
)
*
v
.
conj
(),
axis
=
0
)
return
dot
(
dot
(
energyMat
,
u
)
.
T
,
v
.
conj
())
.
T
def
norm
(
self
,
u
:
Np2D
,
dual
:
bool
=
False
,
is_state
:
bool
=
True
)
->
Np1D
:
return
np
.
abs
(
self
.
innerProduct
(
u
,
u
,
onlyDiag
=
True
,
dual
=
dual
,
is_state
=
is_state
))
**
.
5
def
baselineA
(
self
):
"""Return 0 of shape consistent with operator of linear system."""
if
(
hasattr
(
self
,
"As"
)
and
isinstance
(
self
.
As
,
Iterable
)
and
self
.
As
[
0
]
is
not
None
):
d
=
self
.
As
[
0
]
.
shape
[
0
]
else
:
d
=
self
.
spacedim
return
scsp
.
csr_matrix
((
np
.
zeros
(
0
),
np
.
zeros
(
0
),
np
.
zeros
(
d
+
1
)),
shape
=
(
d
,
d
),
dtype
=
np
.
complex
)
def
baselineb
(
self
):
"""Return 0 of shape consistent with RHS of linear system."""
return
np
.
zeros
(
self
.
spacedim
,
dtype
=
np
.
complex
)
@nonaffine_construct
@abstractmethod
def
A
(
self
,
mu
:
paramVal
=
[],
der
:
List
[
int
]
=
0
)
->
Np2D
:
"""
Assemble terms of operator of linear system and return it (or its
derivative) at a given parameter.
"""
return
@nonaffine_construct
@abstractmethod
def
b
(
self
,
mu
:
paramVal
=
[],
der
:
List
[
int
]
=
0
)
->
Np1D
:
"""
Assemble terms of RHS of linear system and return it (or its
derivative) at a given parameter.
"""
return
@property
def
C
(
self
):
"""Value of C."""
if
self
.
_C
is
None
:
self
.
_C
=
1.
return
self
.
_C
@property
def
isCEye
(
self
):
return
isinstance
(
self
.
C
,
Number
)
def
applyC
(
self
,
u
:
sampList
):
"""Apply LHS of linear system."""
return
dot
(
self
.
C
,
u
)
def
applyCpInv
(
self
,
u
:
sampList
):
"""Apply pseudoinverse of LHS of linear system."""
return
dot
(
pseudoInverse
(
self
.
C
),
u
)
def
setSolver
(
self
,
solverType
:
str
,
solverArgs
:
DictAny
=
{}):
"""Choose solver type and parameters."""
self
.
_solver
,
self
.
_solverArgs
=
setupSolver
(
solverType
,
solverArgs
)
def
solve
(
self
,
mu
:
paramList
=
[],
RHS
:
sampList
=
None
,
return_state
:
bool
=
False
)
->
sampList
:
"""
Find solution of linear system.
Args:
mu: parameter value.
RHS: RHS of linear system. If None, defaults to that of parametric
system. Defaults to None.
return_state: whether to return state before multiplication by c.
Defaults to False.
"""
from
rrompy.sampling
import
sampleList
,
emptySampleList
if
mu
==
[]:
mu
=
self
.
mu0
mu
=
self
.
checkParameterList
(
mu
)
if
len
(
mu
)
==
0
:
return
emptySampleList
()
mu
,
idx
,
sizes
=
listScatter
(
mu
,
return_sizes
=
True
)
mu
=
self
.
checkParameterList
(
mu
)
req
,
emptyCores
=
[],
np
.
where
(
np
.
logical_not
(
sizes
))[
0
]
if
len
(
mu
)
==
0
:
uL
,
uT
=
recv
(
source
=
0
,
tag
=
poolRank
())
sol
=
np
.
empty
((
uL
,
0
),
dtype
=
uT
)
else
:
if
RHS
is
None
:
# build RHSs
RHS
=
sampleList
([
self
.
b
(
m
)
for
m
in
mu
])
else
:
RHS
=
sampleList
(
RHS
)
if
len
(
RHS
)
>
1
:
RHS
=
sampleList
([
RHS
[
i
]
for
i
in
idx
])
mult
=
0
if
len
(
RHS
)
==
1
else
1
RROMPyAssert
(
mult
*
(
len
(
mu
)
-
1
)
+
1
,
len
(
RHS
),
"Sample size"
)
for
j
,
mj
in
enumerate
(
mu
):
u
=
tsolve
(
self
.
A
(
mj
),
RHS
[
mult
*
j
],
self
.
_solver
,
self
.
_solverArgs
)
if
j
==
0
:
sol
=
np
.
empty
((
len
(
u
),
len
(
mu
)),
dtype
=
u
.
dtype
)
if
masterCore
():
for
dest
in
emptyCores
:
req
+=
[
isend
((
len
(
u
),
u
.
dtype
),
dest
=
dest
,
tag
=
dest
)]
sol
[:,
j
]
=
u
if
not
return_state
:
sol
=
self
.
applyC
(
sol
)
for
r
in
req
:
r
.
wait
()
return
sampleList
(
matrixGatherv
(
sol
,
sizes
))
def
residual
(
self
,
mu
:
paramList
=
[],
u
:
sampList
=
None
,
post_c
:
bool
=
True
)
->
sampList
:
"""
Find residual of linear system for given approximate solution.
Args:
mu: parameter value.
u: numpy complex array with function dofs. If None, set to 0.
post_c: whether to post-process using c. Defaults to True.
"""
from
rrompy.sampling
import
sampleList
,
emptySampleList
if
mu
==
[]:
mu
=
self
.
mu0
mu
=
self
.
checkParameterList
(
mu
)
if
len
(
mu
)
==
0
:
return
emptySampleList
()
mu
,
idx
,
sizes
=
listScatter
(
mu
,
return_sizes
=
True
)
mu
=
self
.
checkParameterList
(
mu
)
req
,
emptyCores
=
[],
np
.
where
(
np
.
logical_not
(
sizes
))[
0
]
if
len
(
mu
)
==
0
:
uL
,
uT
=
recv
(
source
=
0
,
tag
=
poolRank
())
res
=
np
.
empty
((
uL
,
0
),
dtype
=
uT
)
else
:
v
=
sampleList
(
np
.
zeros
((
self
.
spacedim
,
len
(
mu
))))
if
u
is
not
None
:
u
=
sampleList
(
u
)
v
=
v
+
sampleList
([
u
[
i
]
for
i
in
idx
])
for
j
,
(
mj
,
vj
)
in
enumerate
(
zip
(
mu
,
v
)):
r
=
self
.
b
(
mj
)
-
dot
(
self
.
A
(
mj
),
vj
)
if
j
==
0
:
res
=
np
.
empty
((
len
(
r
),
len
(
mu
)),
dtype
=
r
.
dtype
)
if
masterCore
():
for
dest
in
emptyCores
:
req
+=
[
isend
((
len
(
r
),
r
.
dtype
),
dest
=
dest
,
tag
=
dest
)]
res
[:,
j
]
=
r
if
post_c
:
res
=
self
.
applyC
(
res
)
for
r
in
req
:
r
.
wait
()
return
sampleList
(
matrixGatherv
(
res
,
sizes
))
cutOffPolesRMax
,
cutOffPolesRMin
=
np
.
inf
,
-
np
.
inf
cutOffPolesRMaxRel
,
cutOffPolesRMinRel
=
np
.
inf
,
-
np
.
inf
cutOffPolesIMax
,
cutOffPolesIMin
=
np
.
inf
,
-
np
.
inf
cutOffPolesIMaxRel
,
cutOffPolesIMinRel
=
np
.
inf
,
-
np
.
inf
cutOffResNormMin
=
-
1
def
flagBadPolesResidues
(
self
,
poles
:
Np1D
,
residues
:
Np1D
=
None
,
relative
:
bool
=
False
)
->
Np1D
:
"""
Flag (numerical) poles/residues which are impossible.
Args:
poles: poles to be judged.
residues: residues to be judged.
relative: whether relative values should be used for poles.
"""
poles
=
np
.
array
(
poles
)
.
flatten
()
flag
=
np
.
zeros
(
len
(
poles
),
dtype
=
bool
)
if
residues
is
None
:
self
.
_ignoreResidues
=
self
.
cutOffResNormMin
<=
0.
if
relative
:
RMax
,
RMin
=
self
.
cutOffPolesRMaxRel
,
self
.
cutOffPolesRMinRel
IMax
,
IMin
=
self
.
cutOffPolesIMaxRel
,
self
.
cutOffPolesIMinRel
else
:
RMax
,
RMin
=
self
.
cutOffPolesRMax
,
self
.
cutOffPolesRMin
IMax
,
IMin
=
self
.
cutOffPolesIMax
,
self
.
cutOffPolesIMin
if
not
np
.
isinf
(
RMax
):
flag
=
np
.
logical_or
(
flag
,
np
.
real
(
poles
)
>
RMax
)
if
not
np
.
isinf
(
RMin
):
flag
=
np
.
logical_or
(
flag
,
np
.
real
(
poles
)
<
RMin
)
if
not
np
.
isinf
(
IMax
):
flag
=
np
.
logical_or
(
flag
,
np
.
imag
(
poles
)
>
IMax
)
if
not
np
.
isinf
(
IMin
):
flag
=
np
.
logical_or
(
flag
,
np
.
imag
(
poles
)
<
IMin
)
else
:
residues
=
np
.
array
(
residues
)
.
reshape
(
len
(
poles
),
-
1
)
if
self
.
cutOffResNormMin
>
0.
:
if
residues
.
shape
[
1
]
==
self
.
spacedim
:
resEff
=
self
.
norm
(
residues
.
T
)
else
:
resEff
=
np
.
linalg
.
norm
(
residues
,
axis
=
1
)
resEff
/=
np
.
max
(
resEff
)
flag
=
np
.
logical_or
(
flag
,
resEff
<
self
.
cutOffResNormMin
)
return
flag
Event Timeline
Log In to Comment