Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F65791462
marginal_proxy_engine.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Thu, Jun 6, 06:25
Size
16 KB
Mime Type
text/x-python
Expires
Sat, Jun 8, 06:25 (2 d)
Engine
blob
Format
Raw Data
Handle
18127740
Attached To
R6746 RationalROMPy
marginal_proxy_engine.py
View Options
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see <http://www.gnu.org/licenses/>.
#
import
numpy
as
np
import
scipy.sparse
as
scsp
from
copy
import
deepcopy
as
copy
,
copy
as
softcopy
from
rrompy.utilities.base.types
import
(
Np1D
,
Np2D
,
ScOp
,
Tuple
,
List
,
paramVal
,
paramList
,
HFEng
,
sampList
)
from
rrompy.utilities.base
import
multibinom
from
rrompy.utilities.poly_fitting.polynomial
import
(
hashDerivativeToIdx
as
hashD
,
hashIdxToDerivative
as
hashI
)
from
rrompy.utilities.exception_manager
import
RROMPyAssert
from
rrompy.parameter
import
checkParameter
,
checkParameterList
from
rrompy.sampling
import
sampleList
,
emptySampleList
from
rrompy.solver.scipy
import
tensorizeLS
,
detensorizeLS
__all__
=
[
'MarginalProxyEngine'
]
class
MarginalProxyEngine
:
"""
Marginalized should prescribe fixed value for the marginalized parameters
and leave None elsewhere.
"""
def
__init__
(
self
,
HFEngine
:
HFEng
,
marginalized
:
Np1D
):
self
.
baseHF
=
HFEngine
self
.
marg
=
marginalized
self
.
_setupAs
()
self
.
_setupbs
()
@property
def
freeLocations
(
self
):
return
tuple
([
x
for
x
in
range
(
self
.
nparBase
)
if
self
.
marg
[
x
]
is
None
])
@property
def
fixedLocations
(
self
):
return
tuple
([
x
for
x
in
range
(
self
.
nparBase
)
\
if
self
.
marg
[
x
]
is
not
None
])
@property
def
muFixed
(
self
):
muF
=
checkParameter
([
self
.
marg
[
x
]
for
x
in
self
.
fixedLocations
])
if
self
.
nparBase
-
self
.
npar
>
0
:
muF
=
muF
[
0
]
return
muF
@property
def
rescalingExpFree
(
self
):
return
[
self
.
baseHF
.
rescalingExp
[
x
]
for
x
in
self
.
freeLocations
]
@property
def
rescalingExpFixed
(
self
):
return
[
self
.
baseHF
.
rescalingExp
[
x
]
for
x
in
self
.
fixedLocations
]
@property
def
V
(
self
):
return
self
.
baseHF
.
V
def
name
(
self
)
->
str
:
return
"{}-proxy for {}"
.
format
(
self
.
freeLocations
,
self
.
baseHF
.
name
())
def
__str__
(
self
)
->
str
:
return
self
.
name
()
def
__repr__
(
self
)
->
str
:
return
self
.
__str__
()
+
" at "
+
hex
(
id
(
self
))
def
__dir_base__
(
self
):
return
[
x
for
x
in
self
.
__dir__
()
if
x
[:
2
]
!=
"__"
]
def
__deepcopy__
(
self
,
memo
):
return
softcopy
(
self
)
@property
def
npar
(
self
):
"""Value of npar."""
return
len
(
self
.
freeLocations
)
@property
def
nparBase
(
self
):
"""Value of npar."""
return
self
.
baseHF
.
npar
@property
def
nAs
(
self
):
"""Value of nAs."""
return
self
.
_nAs
@property
def
nbs
(
self
):
"""Value of nbs."""
return
self
.
_nbs
@property
def
nbsH
(
self
)
->
int
:
return
max
(
self
.
nbs
,
self
.
nAs
)
def
spacedim
(
self
):
return
self
.
As
[
0
]
.
shape
[
1
]
def
checkParameter
(
self
,
mu
:
paramList
):
return
checkParameter
(
mu
,
self
.
npar
)
def
checkParameterList
(
self
,
mu
:
paramList
):
return
checkParameterList
(
mu
,
self
.
npar
)
def
buildEnergyNormForm
(
self
):
self
.
baseHF
.
buildEnergyNormForm
()
self
.
energyNormMatrix
=
self
.
baseHF
.
energyNormMatrix
def
buildEnergyNormDualForm
(
self
):
self
.
baseHF
.
buildEnergyNormDualForm
()
self
.
energyNormDualMatrix
=
self
.
baseHF
.
energyNormDualMatrix
def
buildDualityPairingForm
(
self
):
self
.
baseHF
.
buildDualityPairingForm
()
self
.
dualityMatrix
=
self
.
baseHF
.
dualityMatrix
def
buildEnergyNormPartialDualForm
(
self
):
self
.
baseHF
.
buildEnergyNormPartialDualForm
()
self
.
energyNormPartialDualMatrix
=
(
self
.
baseHF
.
energyNormPartialDualMatrix
)
def
innerProduct
(
self
,
u
:
Np2D
,
v
:
Np2D
,
onlyDiag
:
bool
=
False
,
dual
:
bool
=
False
,
duality
:
bool
=
True
)
->
Np2D
:
return
self
.
baseHF
.
innerProduct
(
u
,
v
,
onlyDiag
,
dual
,
duality
)
def
norm
(
self
,
u
:
Np2D
,
dual
:
bool
=
False
,
duality
:
bool
=
True
)
->
Np1D
:
return
self
.
baseHF
.
norm
(
u
,
dual
,
duality
)
def
checkAInBounds
(
self
,
derI
:
int
=
0
):
"""Check if derivative index is oob for operator of linear system."""
if
derI
<
0
or
derI
>=
self
.
nAs
:
d
=
self
.
spacedim
()
return
scsp
.
csr_matrix
((
np
.
zeros
(
0
),
np
.
zeros
(
0
),
np
.
zeros
(
d
+
1
)),
shape
=
(
d
,
d
),
dtype
=
np
.
complex
)
def
checkbInBounds
(
self
,
derI
:
int
=
0
,
homogeneized
:
bool
=
False
):
"""Check if derivative index is oob for RHS of linear system."""
nbs
=
self
.
nbsH
if
homogeneized
else
self
.
nbs
if
derI
<
0
or
derI
>=
nbs
:
return
np
.
zeros
(
self
.
spacedim
(),
dtype
=
np
.
complex
)
def
_assembleA
(
self
,
mu
:
paramVal
=
[],
der
:
List
[
int
]
=
0
,
derI
:
int
=
None
)
->
ScOp
:
"""Assemble (derivative of) operator of linear system."""
mu
=
self
.
checkParameter
(
mu
)
if
self
.
npar
>
0
:
mu
=
mu
[
0
]
if
not
hasattr
(
der
,
"__len__"
):
der
=
[
der
]
*
self
.
npar
if
derI
is
None
:
derI
=
hashD
(
der
)
Anull
=
self
.
checkAInBounds
(
derI
)
if
Anull
is
not
None
:
return
Anull
rExp
=
self
.
rescalingExpFree
A
=
copy
(
self
.
As
[
derI
])
for
j
in
range
(
derI
+
1
,
self
.
nAs
):
derIdx
=
hashI
(
j
,
self
.
npar
)
diffIdx
=
[
x
-
y
for
(
x
,
y
)
in
zip
(
derIdx
,
der
)]
if
np
.
all
([
x
>=
0
for
x
in
diffIdx
]):
A
=
A
+
(
np
.
prod
((
mu
**
rExp
)
**
diffIdx
)
*
multibinom
(
derIdx
,
diffIdx
)
*
self
.
As
[
j
])
return
A
def
A
(
self
,
mu
:
paramVal
=
[],
der
:
List
[
int
]
=
0
)
->
ScOp
:
"""
Assemble terms of operator of linear system and return it (or its
derivative) at a given parameter.
"""
mu
=
self
.
checkParameter
(
mu
)
if
not
hasattr
(
der
,
"__len__"
):
der
=
[
der
]
*
self
.
npar
derI
=
hashD
(
der
)
return
self
.
_assembleA
(
mu
,
der
,
derI
)
def
_setupAs
(
self
):
self
.
As
=
[]
self
.
_nAs
=
0
for
j
in
range
(
self
.
baseHF
.
nAs
):
derjBase
=
hashI
(
j
,
self
.
nparBase
)
jNew
=
hashD
([
derjBase
[
i
]
for
i
in
self
.
freeLocations
])
derjFixed
=
[
derjBase
[
i
]
for
i
in
self
.
fixedLocations
]
A
=
(
np
.
prod
((
self
.
muFixed
**
self
.
rescalingExpFixed
)
**
derjFixed
)
*
self
.
baseHF
.
As
[
j
])
if
jNew
>=
self
.
_nAs
:
for
_
in
range
(
self
.
_nAs
,
jNew
):
self
.
As
+=
[
self
.
checkAInBounds
(
-
1
)]
self
.
As
+=
[
A
]
self
.
_nAs
=
jNew
+
1
else
:
self
.
As
[
jNew
]
=
self
.
As
[
jNew
]
+
A
def
affineLinearSystemA
(
self
,
mu
:
paramVal
=
[])
->
List
[
Np2D
]:
"""
Assemble affine blocks of operator of linear system (just linear
blocks).
"""
As
=
[
None
]
*
self
.
nAs
for
j
in
range
(
self
.
nAs
):
As
[
j
]
=
self
.
A
(
mu
,
hashI
(
j
,
self
.
npar
))
return
As
def
affineWeightsA
(
self
,
mu
:
paramVal
=
[])
->
List
[
str
]:
"""
Assemble affine blocks of operator of linear system (just affine
weights). Stored as strings for the sake of pickling.
"""
mu
=
self
.
checkParameter
(
mu
)
lambdasA
=
[
"1."
]
mu0Eff
=
mu
**
self
.
rescalingExpFree
for
j
in
range
(
1
,
self
.
nAs
):
lambdasA
+=
[
"np.prod((mu ** ({1}) - [{0}]) ** ({2}))"
.
format
(
','
.
join
([
str
(
x
)
for
x
in
mu0Eff
[
0
]]),
self
.
rescalingExpFree
,
hashI
(
j
,
self
.
npar
))]
return
lambdasA
def
affineBlocksA
(
self
,
mu
:
paramVal
=
[])
\
->
Tuple
[
List
[
Np2D
],
List
[
str
]]:
"""Assemble affine blocks of operator of linear system."""
return
self
.
affineLinearSystemA
(
mu
),
self
.
affineWeightsA
(
mu
)
def
_assembleb
(
self
,
mu
:
paramVal
=
[],
der
:
List
[
int
]
=
0
,
derI
:
int
=
None
,
homogeneized
:
bool
=
False
)
->
ScOp
:
"""Assemble (derivative of) (homogeneized) RHS of linear system."""
mu
=
self
.
checkParameter
(
mu
)
if
self
.
npar
>
0
:
mu
=
mu
[
0
]
if
not
hasattr
(
der
,
"__len__"
):
der
=
[
der
]
*
self
.
npar
if
derI
is
None
:
derI
=
hashD
(
der
)
bnull
=
self
.
checkbInBounds
(
derI
,
homogeneized
)
if
bnull
is
not
None
:
return
bnull
bs
=
self
.
bsH
if
homogeneized
else
self
.
bs
rExp
=
self
.
rescalingExpFree
b
=
copy
(
bs
[
derI
])
for
j
in
range
(
derI
+
1
,
len
(
bs
)):
derIdx
=
hashI
(
j
,
self
.
npar
)
diffIdx
=
[
x
-
y
for
(
x
,
y
)
in
zip
(
derIdx
,
der
)]
if
np
.
all
([
x
>=
0
for
x
in
diffIdx
]):
b
=
b
+
(
np
.
prod
((
mu
**
rExp
)
**
diffIdx
)
*
multibinom
(
derIdx
,
diffIdx
)
*
bs
[
j
])
return
b
def
b
(
self
,
mu
:
paramVal
=
[],
der
:
List
[
int
]
=
0
,
homogeneized
:
bool
=
False
)
->
Np1D
:
"""
Assemble terms of (homogeneized) RHS of linear system and return it (or
its derivative) at a given parameter.
"""
mu
=
self
.
checkParameter
(
mu
)
if
not
hasattr
(
der
,
"__len__"
):
der
=
[
der
]
*
self
.
npar
derI
=
hashD
(
der
)
return
self
.
_assembleb
(
mu
,
der
,
derI
,
homogeneized
)
def
_setupbs
(
self
):
self
.
bs
=
[]
self
.
_nbs
=
0
for
j
in
range
(
self
.
baseHF
.
nbs
):
derjBase
=
hashI
(
j
,
self
.
nparBase
)
jNew
=
hashD
([
derjBase
[
i
]
for
i
in
self
.
freeLocations
])
derjFixed
=
[
derjBase
[
i
]
for
i
in
self
.
fixedLocations
]
b
=
(
np
.
prod
((
self
.
muFixed
**
self
.
rescalingExpFixed
)
**
derjFixed
)
*
self
.
baseHF
.
bs
[
j
])
if
jNew
>=
self
.
_nbs
:
for
_
in
range
(
self
.
_nbs
,
jNew
):
self
.
bs
+=
[
self
.
checkbInBounds
(
-
1
)]
self
.
bs
+=
[
b
]
self
.
_nbs
=
jNew
+
1
else
:
self
.
bs
[
jNew
]
=
self
.
bs
[
jNew
]
+
b
def
affineLinearSystemb
(
self
,
mu
:
paramVal
=
[],
homogeneized
:
bool
=
False
)
->
List
[
Np1D
]:
"""
Assemble affine blocks of RHS of linear system (just linear blocks).
"""
nbs
=
self
.
nbsH
if
homogeneized
else
self
.
nbs
bs
=
[
None
]
*
nbs
for
j
in
range
(
nbs
):
bs
[
j
]
=
self
.
b
(
mu
,
hashI
(
j
,
self
.
npar
),
homogeneized
)
return
bs
def
affineWeightsb
(
self
,
mu
:
paramVal
=
[],
homogeneized
:
bool
=
False
)
->
List
[
str
]:
"""
Assemble affine blocks of RHS of linear system (just affine weights).
Stored as strings for the sake of pickling.
"""
mu
=
self
.
checkParameter
(
mu
)
nbs
=
self
.
nbsH
if
homogeneized
else
self
.
nbs
lambdasb
=
[
"1."
]
mu0Eff
=
mu
**
self
.
rescalingExpFree
for
j
in
range
(
1
,
nbs
):
lambdasb
+=
[
"np.prod((mu ** ({1}) - [{0}]) ** ({2}))"
.
format
(
','
.
join
([
str
(
x
)
for
x
in
mu0Eff
[
0
]]),
self
.
rescalingExpFree
,
hashI
(
j
,
self
.
npar
))]
return
lambdasb
def
affineBlocksb
(
self
,
mu
:
paramVal
=
[],
homogeneized
:
bool
=
False
)
\
->
Tuple
[
List
[
Np1D
],
List
[
str
]]:
"""Assemble affine blocks of RHS of linear system."""
return
(
self
.
affineLinearSystemb
(
mu
,
homogeneized
),
self
.
affineWeightsb
(
mu
,
homogeneized
))
def
solve
(
self
,
mu
:
paramList
=
[],
RHS
:
sampList
=
None
,
homogeneized
:
bool
=
False
)
->
sampList
:
"""
Find solution of linear system.
Args:
mu: parameter value.
RHS: RHS of linear system. If None, defaults to that of parametric
system. Defaults to None.
"""
mu
,
_
=
self
.
checkParameterList
(
mu
)
if
mu
.
shape
[
0
]
==
0
:
mu
.
reset
((
1
,
self
.
npar
),
mu
.
dtype
)
sol
=
emptySampleList
()
if
len
(
mu
)
>
0
:
if
RHS
is
None
:
RHS
=
[
self
.
b
(
m
,
homogeneized
=
homogeneized
)
for
m
in
mu
]
RHS
=
sampleList
(
RHS
)
mult
=
0
if
len
(
RHS
)
==
1
else
1
RROMPyAssert
(
mult
*
(
len
(
mu
)
-
1
)
+
1
,
len
(
RHS
),
"Sample size"
)
u
=
self
.
baseHF
.
_solver
(
self
.
A
(
mu
[
0
]),
RHS
[
0
],
self
.
baseHF
.
_solverArgs
)
sol
.
reset
((
len
(
u
),
len
(
mu
)),
dtype
=
u
.
dtype
)
sol
[
0
]
=
u
for
j
in
range
(
1
,
len
(
mu
),
self
.
baseHF
.
_solveBatchSize
):
if
self
.
baseHF
.
_solveBatchSize
!=
1
:
iRange
=
list
(
range
(
j
,
min
(
j
+
self
.
baseHF
.
_solveBatchSize
,
len
(
mu
))))
As
=
[
self
.
A
(
mu
[
i
])
for
i
in
iRange
]
bs
=
[
RHS
[
mult
*
i
]
for
i
in
iRange
]
A
,
b
=
tensorizeLS
(
As
,
bs
)
else
:
A
,
b
=
self
.
A
(
mu
[
j
]),
RHS
[
mult
*
j
]
solStack
=
self
.
baseHF
.
_solver
(
A
,
b
,
self
.
baseHF
.
_solverArgs
)
if
self
.
baseHF
.
_solveBatchSize
!=
1
:
sol
[
iRange
]
=
detensorizeLS
(
solStack
,
len
(
iRange
))
else
:
sol
[
j
]
=
solStack
return
sol
def
residual
(
self
,
u
:
sampList
,
mu
:
paramList
=
[],
homogeneized
:
bool
=
False
,
duality
:
bool
=
True
)
->
sampList
:
"""
Find residual of linear system for given approximate solution.
Args:
u: numpy complex array with function dofs. If None, set to 0.
mu: parameter value.
"""
mu
,
_
=
self
.
checkParameterList
(
mu
)
if
mu
.
shape
[
0
]
==
0
:
mu
.
reset
((
1
,
self
.
npar
),
mu
.
dtype
)
if
u
is
not
None
:
u
=
sampleList
(
u
)
mult
=
0
if
len
(
u
)
==
1
else
1
RROMPyAssert
(
mult
*
(
len
(
mu
)
-
1
)
+
1
,
len
(
u
),
"Sample size"
)
res
=
emptySampleList
()
if
duality
and
not
hasattr
(
self
,
"dualityMatrix"
):
self
.
buildDualityPairingForm
()
for
j
in
range
(
len
(
mu
)):
b
=
self
.
b
(
mu
[
j
],
homogeneized
=
homogeneized
)
if
u
is
None
:
r
=
b
else
:
r
=
b
-
self
.
A
(
mu
[
j
])
.
dot
(
u
[
mult
*
j
])
if
j
==
0
:
res
.
reset
((
len
(
r
),
len
(
mu
)),
dtype
=
r
.
dtype
)
if
duality
:
r
=
self
.
dualityMatrix
.
dot
(
r
)
res
[
j
]
=
r
return
res
def
_rayleighQuotient
(
self
,
*
args
,
**
kwargs
)
->
float
:
return
self
.
baseHF
.
_rayleighQuotient
(
*
args
,
**
kwargs
)
def
stabilityFactor
(
self
,
u
:
sampList
,
mu
:
paramList
=
[],
nIterP
:
int
=
10
,
nIterR
:
int
=
10
)
->
sampList
:
"""
Find stability factor of matrix of linear system using iterative
inverse power iteration- and Rayleigh quotient-based procedure.
Args:
u: numpy complex arrays with function dofs.
mu: parameter values.
nIterP: number of iterations of power method.
nIterR: number of iterations of Rayleigh quotient method.
"""
mu
,
_
=
self
.
checkParameterList
(
mu
)
if
mu
.
shape
[
0
]
==
0
:
mu
.
reset
((
1
,
self
.
npar
),
mu
.
dtype
)
u
=
sampleList
(
u
)
mult
=
0
if
len
(
u
)
==
1
else
1
RROMPyAssert
(
mult
*
(
len
(
mu
)
-
1
)
+
1
,
len
(
u
),
"Sample size"
)
stabFact
=
np
.
empty
(
len
(
mu
),
dtype
=
float
)
if
not
hasattr
(
self
,
"energyNormMatrix"
):
self
.
buildEnergyNormForm
()
for
j
in
range
(
len
(
mu
)):
stabFact
[
j
]
=
self
.
_rayleighQuotient
(
self
.
A
(
mu
[
j
]),
u
[
mult
*
j
],
self
.
energyNormMatrix
,
0.
,
nIterP
,
nIterR
)
return
stabFact
def
plot
(
self
,
*
args
,
**
kwargs
):
"""
Do some nice plots of the complex-valued function with given dofs.
"""
self
.
baseHF
.
plot
(
*
args
,
**
kwargs
)
def
plotmesh
(
self
,
*
args
,
**
kwargs
):
"""
Do a nice plot of the mesh.
"""
self
.
baseHF
.
plotmesh
(
*
args
,
**
kwargs
)
def
outParaview
(
self
,
*
args
,
**
kwargs
):
"""
Output complex-valued function with given dofs to ParaView file.
"""
self
.
baseHF
.
outParaview
(
*
args
,
**
kwargs
)
def
outParaviewTimeDomain
(
self
,
*
args
,
**
kwargs
):
"""
Output complex-valued function with given dofs to ParaView file,
converted to time domain.
"""
self
.
baseHF
.
outParaviewTimeDomain
(
*
args
,
**
kwargs
)
Event Timeline
Log In to Comment