Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F62100124
hfengine_base.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Fri, May 10, 22:03
Size
10 KB
Mime Type
text/x-python
Expires
Sun, May 12, 22:03 (2 d)
Engine
blob
Format
Raw Data
Handle
17603374
Attached To
R6746 RationalROMPy
hfengine_base.py
View Options
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see <http://www.gnu.org/licenses/>.
#
from
abc
import
abstractmethod
import
numpy
as
np
import
scipy.sparse
as
scsp
from
numbers
import
Number
from
copy
import
copy
as
softcopy
,
deepcopy
as
copy
from
rrompy.utilities.base.decorators
import
nonaffine_construct
from
rrompy.utilities.base.types
import
(
Np1D
,
Np2D
,
List
,
DictAny
,
paramVal
,
paramList
,
sampList
)
from
rrompy.utilities.numerical
import
solve
as
tsolve
,
dot
,
customPInv
from
rrompy.utilities.numerical.rayleigh_quotient_iteration
import
(
rayleighQuotientIteration
)
from
rrompy.utilities.exception_manager
import
RROMPyAssert
from
rrompy.parameter
import
checkParameter
,
checkParameterList
from
rrompy.solver.linear_solver
import
setupSolver
try
:
from
mpi4py
import
MPI
_MPI_IS_LOADED
=
True
from
rrompy.utilities.base.partitioning
import
partition
except
ImportError
:
_MPI_IS_LOADED
=
False
__all__
=
[
'HFEngineBase'
]
class
HFEngineBase
:
"""Generic solver for parametric problems."""
_forceSerial
=
False
def
__init__
(
self
,
verbosity
:
int
=
10
,
timestamp
:
bool
=
True
):
self
.
verbosity
=
verbosity
self
.
timestamp
=
timestamp
self
.
setSolver
(
"SPSOLVE"
,
{
"use_umfpack"
:
False
})
self
.
npar
=
0
self
.
_C
=
None
self
.
outputNormMatrix
=
1.
def
name
(
self
)
->
str
:
return
self
.
__class__
.
__name__
def
__str__
(
self
)
->
str
:
return
self
.
name
()
def
__repr__
(
self
)
->
str
:
return
self
.
__str__
()
+
" at "
+
hex
(
id
(
self
))
def
__dir_base__
(
self
):
return
[
x
for
x
in
self
.
__dir__
()
if
x
[:
2
]
!=
"__"
]
def
__deepcopy__
(
self
,
memo
):
return
softcopy
(
self
)
@property
def
npar
(
self
):
"""Value of npar."""
return
self
.
_npar
@npar.setter
def
npar
(
self
,
npar
):
nparOld
=
self
.
_npar
if
hasattr
(
self
,
"_npar"
)
else
-
1
if
npar
!=
nparOld
:
self
.
rescalingExp
=
[
1.
]
*
npar
self
.
_npar
=
npar
@property
def
spacedim
(
self
):
return
1
def
checkParameter
(
self
,
mu
:
paramVal
):
muP
=
checkParameter
(
mu
,
self
.
npar
)
if
self
.
npar
==
0
:
muP
.
reset
((
1
,
0
),
muP
.
dtype
)
return
muP
def
checkParameterList
(
self
,
mu
:
paramList
):
muL
=
checkParameterList
(
mu
,
self
.
npar
)
if
self
.
npar
==
0
:
muL
[
0
]
.
reset
((
1
,
0
),
muL
[
0
]
.
dtype
)
return
muL
def
buildEnergyNormForm
(
self
):
"""
Build sparse matrix (in CSR format) representative of scalar product.
"""
self
.
energyNormMatrix
=
1.
def
buildEnergyNormDualForm
(
self
):
"""
Build sparse matrix (in CSR format) representative of dual scalar
product without duality.
"""
self
.
energyNormDualMatrix
=
1.
def
innerProduct
(
self
,
u
:
Np2D
,
v
:
Np2D
,
onlyDiag
:
bool
=
False
,
dual
:
bool
=
False
,
is_state
:
bool
=
True
)
->
Np2D
:
"""Scalar product."""
if
is_state
or
self
.
isCEye
:
if
dual
:
if
not
hasattr
(
self
,
"energyNormDualMatrix"
):
self
.
buildEnergyNormDualForm
()
energyMat
=
self
.
energyNormDualMatrix
else
:
if
not
hasattr
(
self
,
"energyNormMatrix"
):
self
.
buildEnergyNormForm
()
energyMat
=
self
.
energyNormMatrix
else
:
energyMat
=
self
.
outputNormMatrix
if
not
isinstance
(
u
,
(
np
.
ndarray
,)):
u
=
u
.
data
if
not
isinstance
(
v
,
(
np
.
ndarray
,)):
v
=
v
.
data
if
onlyDiag
:
return
np
.
sum
(
dot
(
energyMat
,
u
)
*
v
.
conj
(),
axis
=
0
)
return
dot
(
dot
(
energyMat
,
u
)
.
T
,
v
.
conj
())
.
T
def
norm
(
self
,
u
:
Np2D
,
dual
:
bool
=
False
,
is_state
:
bool
=
True
)
->
Np1D
:
return
np
.
abs
(
self
.
innerProduct
(
u
,
u
,
onlyDiag
=
True
,
dual
=
dual
,
is_state
=
is_state
))
**
.
5
def
baselineA
(
self
):
"""Return 0 of shape consistent with operator of linear system."""
if
(
hasattr
(
self
,
"As"
)
and
hasattr
(
self
.
As
,
"__len__"
)
and
self
.
As
[
0
]
is
not
None
):
d
=
self
.
As
[
0
]
.
shape
[
0
]
else
:
d
=
self
.
spacedim
return
scsp
.
csr_matrix
((
np
.
zeros
(
0
),
np
.
zeros
(
0
),
np
.
zeros
(
d
+
1
)),
shape
=
(
d
,
d
),
dtype
=
np
.
complex
)
def
baselineb
(
self
):
"""Return 0 of shape consistent with RHS of linear system."""
return
np
.
zeros
(
self
.
spacedim
,
dtype
=
np
.
complex
)
@nonaffine_construct
@abstractmethod
def
A
(
self
,
mu
:
paramVal
=
[],
der
:
List
[
int
]
=
0
)
->
Np2D
:
"""
Assemble terms of operator of linear system and return it (or its
derivative) at a given parameter.
"""
return
@nonaffine_construct
@abstractmethod
def
b
(
self
,
mu
:
paramVal
=
[],
der
:
List
[
int
]
=
0
)
->
Np1D
:
"""
Assemble terms of RHS of linear system and return it (or its
derivative) at a given parameter.
"""
return
@property
def
C
(
self
):
"""Value of C."""
if
self
.
_C
is
None
:
self
.
_C
=
1.
return
self
.
_C
@property
def
isCEye
(
self
):
return
isinstance
(
self
.
C
,
Number
)
def
applyC
(
self
,
u
:
sampList
):
"""Apply LHS of linear system."""
return
dot
(
self
.
C
,
u
)
def
applyCpInv
(
self
,
u
:
sampList
):
"""Apply pseudoinverse of LHS of linear system."""
return
dot
(
customPInv
(
self
.
C
),
u
)
def
setSolver
(
self
,
solverType
:
str
,
solverArgs
:
DictAny
=
{}):
"""Choose solver type and parameters."""
self
.
_solver
,
self
.
_solverArgs
=
setupSolver
(
solverType
,
solverArgs
)
def
solve
(
self
,
mu
:
paramList
=
[],
RHS
:
sampList
=
None
,
return_state
:
bool
=
False
)
->
sampList
:
"""
Find solution of linear system.
Args:
mu: parameter value.
RHS: RHS of linear system. If None, defaults to that of parametric
system. Defaults to None.
return_state: whether to return state before multiplication by c.
Defaults to False.
"""
from
rrompy.sampling
import
sampleList
,
emptySampleList
if
mu
==
[]:
mu
=
self
.
mu0
mu
=
self
.
checkParameterList
(
mu
)[
0
]
if
len
(
mu
)
==
0
:
return
emptySampleList
()
if
_MPI_IS_LOADED
and
not
self
.
_forceSerial
:
comm
=
MPI
.
COMM_WORLD
crank
=
comm
.
Get_rank
()
if
crank
==
0
:
mupart
,
idxpart
=
partition
(
mu
,
comm
.
Get_size
(),
True
)
else
:
mupart
,
idxpart
=
None
,
None
mu0
=
self
.
checkParameterList
(
mu
)[
0
]
mu
=
self
.
checkParameterList
(
comm
.
scatter
(
mupart
,
root
=
0
))[
0
]
idxpart
=
comm
.
bcast
(
idxpart
,
root
=
0
)
if
len
(
mu
)
==
0
:
sol
=
emptySampleList
()
else
:
if
RHS
is
None
:
RHS
=
sampleList
([
self
.
b
(
m
)
for
m
in
mu
])
else
:
RHS
=
sampleList
(
RHS
)
if
len
(
RHS
)
>
1
:
RHS
=
sampleList
([
RHS
[
x
]
for
x
in
idxpart
])
mult
=
0
if
len
(
RHS
)
==
1
else
1
RROMPyAssert
(
mult
*
(
len
(
mu
)
-
1
)
+
1
,
len
(
RHS
),
"Sample size"
)
for
j
in
range
(
len
(
mu
)):
u
=
tsolve
(
self
.
A
(
mu
[
j
]),
RHS
[
mult
*
j
],
self
.
_solver
,
self
.
_solverArgs
)
if
j
==
0
:
sol
=
emptySampleList
()
sol
.
reset
((
len
(
u
),
len
(
mu
)),
dtype
=
u
.
dtype
)
sol
[
j
]
=
u
if
not
return_state
:
sol
=
sampleList
(
self
.
applyC
(
sol
.
data
))
if
_MPI_IS_LOADED
and
not
self
.
_forceSerial
:
solsdata
=
comm
.
allgather
(
sol
.
data
.
T
)
sol
=
emptySampleList
()
sol
.
reset
((
solsdata
[
0
]
.
shape
[
1
],
len
(
mu0
)),
dtype
=
solsdata
[
0
]
.
dtype
)
for
idx
,
dat
in
zip
(
idxpart
,
solsdata
):
if
len
(
idx
)
>
0
:
sol
[
idx
]
=
dat
return
sol
def
residual
(
self
,
mu
:
paramList
=
[],
u
:
sampList
=
None
,
post_c
:
bool
=
True
)
->
sampList
:
"""
Find residual of linear system for given approximate solution.
Args:
mu: parameter value.
u: numpy complex array with function dofs. If None, set to 0.
post_c: whether to post-process using c. Defaults to True.
"""
from
rrompy.sampling
import
sampleList
,
emptySampleList
if
mu
==
[]:
mu
=
self
.
mu0
mu
=
self
.
checkParameterList
(
mu
)[
0
]
if
len
(
mu
)
==
0
:
return
emptySampleList
()
v
=
sampleList
(
np
.
zeros
((
self
.
spacedim
,
len
(
mu
))))
if
u
is
not
None
:
v
=
v
+
sampleList
(
u
)
for
j
in
range
(
len
(
mu
)):
r
=
self
.
b
(
mu
[
j
])
-
dot
(
self
.
A
(
mu
[
j
]),
v
[
j
])
if
post_c
:
if
j
==
0
:
res
=
np
.
empty
((
len
(
r
),
len
(
mu
)),
dtype
=
r
.
dtype
)
res
[:,
j
]
=
r
else
:
if
j
==
0
:
res
=
emptySampleList
()
res
.
reset
((
len
(
r
),
len
(
mu
)),
dtype
=
r
.
dtype
)
res
[
j
]
=
r
if
post_c
:
res
=
sampleList
(
self
.
applyC
(
res
))
return
res
def
stabilityFactor
(
self
,
mu
:
paramList
=
[],
u
:
sampList
=
None
,
nIterP
:
int
=
10
,
nIterR
:
int
=
10
)
->
sampList
:
"""
Find stability factor of matrix of linear system using iterative
inverse power iteration- and Rayleigh quotient-based procedure.
Args:
mu: parameter values.
u: numpy complex arrays with function dofs.
nIterP: number of iterations of power method.
nIterR: number of iterations of Rayleigh quotient method.
"""
from
rrompy.sampling
import
sampleList
if
mu
==
[]:
mu
=
self
.
mu0
mu
=
self
.
checkParameterList
(
mu
)[
0
]
u
=
sampleList
(
u
)
if
len
(
u
)
!=
len
(
mu
):
u
=
sampleList
(
np
.
tile
(
u
.
data
.
reshape
(
-
1
,
1
),
(
1
,
len
(
mu
))))
stabFact
=
np
.
empty
(
len
(
mu
),
dtype
=
float
)
if
not
hasattr
(
self
,
"energyNormMatrix"
):
self
.
buildEnergyNormForm
()
for
j
in
range
(
len
(
mu
)):
stabFact
[
j
]
=
rayleighQuotientIteration
(
self
.
A
(
mu
[
j
]),
u
[
j
],
self
.
energyNormMatrix
,
0.
,
nIterP
,
nIterR
,
self
.
_solver
,
self
.
_solverArgs
)
return
stabFact
Event Timeline
Log In to Comment