Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F102585057
sampling_engine_standard.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Sat, Feb 22, 06:17
Size
14 KB
Mime Type
text/x-python
Expires
Mon, Feb 24, 06:17 (2 d)
Engine
blob
Format
Raw Data
Handle
24370192
Attached To
R6746 RationalROMPy
sampling_engine_standard.py
View Options
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see <http://www.gnu.org/licenses/>.
#
from
numbers
import
Number
from
copy
import
deepcopy
as
copy
import
numpy
as
np
from
warnings
import
catch_warnings
from
rrompy.utilities.base.types
import
(
Np1D
,
Np2D
,
HFEng
,
List
,
paramVal
,
Any
,
paramList
,
sampList
,
Tuple
,
TupleAny
,
DictAny
,
FigHandle
)
from
rrompy.utilities.base.data_structures
import
getNewFilename
from
rrompy.utilities.base
import
verbosityManager
as
vbMng
from
rrompy.utilities.exception_manager
import
(
RROMPyWarning
,
RROMPyException
,
RROMPyAssert
,
RROMPy_READY
,
RROMPy_FRAGILE
)
from
rrompy.utilities.numerical
import
dot
from
rrompy.utilities.numerical.hash_derivative
import
nextDerivativeIndices
from
rrompy.utilities.base.pickle_utilities
import
pickleDump
,
pickleLoad
from
rrompy.parameter
import
(
emptyParameterList
,
checkParameter
,
checkParameterList
)
from
rrompy.sampling
import
sampleList
,
emptySampleList
from
rrompy.utilities.parallel
import
bcast
,
masterCore
__all__
=
[
'SamplingEngineStandard'
]
class
SamplingEngineStandard
:
def
__init__
(
self
,
HFEngine
:
HFEng
,
sample_state
:
bool
=
False
,
verbosity
:
int
=
10
,
timestamp
:
bool
=
True
,
scaleFactor
:
Np1D
=
None
):
self
.
sample_state
=
sample_state
self
.
verbosity
=
verbosity
self
.
timestamp
=
timestamp
vbMng
(
self
,
"INIT"
,
"Initializing sampling engine of type {}."
.
format
(
self
.
name
()),
10
)
self
.
HFEngine
=
HFEngine
vbMng
(
self
,
"DEL"
,
"Done initializing sampling engine."
,
10
)
self
.
scaleFactor
=
scaleFactor
def
name
(
self
)
->
str
:
return
self
.
__class__
.
__name__
def
__str__
(
self
)
->
str
:
return
self
.
name
()
def
__repr__
(
self
)
->
str
:
return
self
.
__str__
()
+
" at "
+
hex
(
id
(
self
))
@property
def
HFEngine
(
self
):
"""Value of HFEngine. Its assignment resets history."""
return
self
.
_HFEngine
@HFEngine.setter
def
HFEngine
(
self
,
HFEngine
):
self
.
_HFEngine
=
HFEngine
self
.
resetHistory
()
@property
def
scaleFactor
(
self
):
"""Value of scaleFactor."""
return
self
.
_scaleFactor
@scaleFactor.setter
def
scaleFactor
(
self
,
scaleFactor
):
if
scaleFactor
is
None
:
scaleFactor
=
1.
if
not
hasattr
(
scaleFactor
,
"__len__"
):
scaleFactor
=
[
scaleFactor
]
self
.
_scaleFactor
=
scaleFactor
def
scaleDer
(
self
,
derIdx
:
Np1D
):
if
not
isinstance
(
self
.
scaleFactor
,
Number
):
RROMPyAssert
(
len
(
derIdx
),
len
(
self
.
scaleFactor
),
"Number of derivative indices"
)
with
catch_warnings
(
record
=
True
)
as
w
:
res
=
np
.
prod
(
np
.
power
(
self
.
scaleFactor
,
derIdx
))
if
len
(
w
)
==
0
:
return
res
raise
RROMPyException
((
"Error in computing derivative scaling "
"factor: {}"
.
format
(
str
(
w
[
-
1
]
.
message
))))
@property
def
feature_keys
(
self
)
->
TupleAny
:
return
[
"mus"
,
"samples"
,
"nsamples"
,
"_derIdxs"
]
@property
def
feature_vals
(
self
)
->
DictAny
:
return
{
"mus"
:
self
.
mus
,
"samples"
:
self
.
samples
,
"nsamples"
:
self
.
nsamples
,
"_derIdxs"
:
self
.
_derIdxs
}
def
_mergeFeature
(
self
,
name
:
str
,
value
:
Any
):
if
name
in
[
"mus"
,
"samples"
]:
getattr
(
self
,
name
)
.
append
(
value
)
elif
name
==
"nsamples"
:
self
.
nsamples
+=
value
elif
name
==
"_derIdxs"
:
self
.
_derIdxs
+=
value
else
:
raise
RROMPyException
((
"Invalid key {} in sampling engine "
"merge."
.
format
(
key
)))
def
store
(
self
,
filenameBase
:
str
=
"sampling_engine"
,
forceNewFile
:
bool
=
True
,
local
:
bool
=
False
)
->
str
:
"""Store sampling engine to file."""
filename
=
None
if
masterCore
():
vbMng
(
self
,
"INIT"
,
"Storing sampling engine to file."
,
20
)
if
forceNewFile
:
filename
=
getNewFilename
(
filenameBase
,
"pkl"
)
else
:
filename
=
"{}.pkl"
.
format
(
filenameBase
)
pickleDump
(
self
.
feature_vals
,
filename
)
vbMng
(
self
,
"DEL"
,
"Done storing engine."
,
20
)
if
local
:
return
filename
=
bcast
(
filename
)
return
filename
def
load
(
self
,
filename
:
str
,
merge
:
bool
=
False
):
"""Load sampling engine from file."""
if
isinstance
(
filename
,
(
list
,
tuple
,)):
self
.
load
(
filename
[
0
],
merge
)
for
filen
in
filename
[
1
:]:
self
.
load
(
filen
,
True
)
return
vbMng
(
self
,
"INIT"
,
"Loading stored sampling engine from file."
,
20
)
datadict
=
pickleLoad
(
filename
)
for
key
in
datadict
:
if
key
in
self
.
feature_keys
:
if
merge
:
self
.
_mergeFeature
(
key
,
datadict
[
key
])
else
:
setattr
(
self
,
key
,
datadict
[
key
])
self
.
_mode
=
RROMPy_FRAGILE
vbMng
(
self
,
"DEL"
,
"Done loading stored engine."
,
20
)
@property
def
projectionMatrix
(
self
)
->
Np2D
:
return
self
.
samples
.
data
def
resetHistory
(
self
):
self
.
_mode
=
RROMPy_READY
self
.
samples
=
emptySampleList
()
self
.
nsamples
=
0
self
.
mus
=
emptyParameterList
()
self
.
_derIdxs
=
[]
def
setsample
(
self
,
u
:
sampList
,
overwrite
:
bool
=
False
):
if
overwrite
:
self
.
samples
[
self
.
nsamples
]
=
u
else
:
if
self
.
nsamples
==
0
:
self
.
samples
=
sampleList
(
u
)
else
:
self
.
samples
.
append
(
u
)
def
popSample
(
self
):
if
hasattr
(
self
,
"nsamples"
)
and
self
.
nsamples
>
1
:
if
self
.
samples
.
shape
[
1
]
>
self
.
nsamples
:
RROMPyWarning
((
"More than 'nsamples' memory allocated for "
"samples. Popping empty sample column."
))
self
.
nsamples
+=
1
self
.
nsamples
-=
1
self
.
samples
.
pop
()
self
.
mus
.
pop
()
else
:
self
.
resetHistory
()
def
preallocateSamples
(
self
,
u
:
sampList
,
mu
:
paramVal
,
n
:
int
):
self
.
_mode
=
RROMPy_READY
self
.
samples
.
reset
((
u
.
shape
[
0
],
n
),
u
.
dtype
)
self
.
samples
[
0
]
=
u
mu
=
checkParameter
(
mu
,
self
.
HFEngine
.
npar
)
self
.
mus
.
reset
((
n
,
self
.
HFEngine
.
npar
))
self
.
mus
[
0
]
=
mu
[
0
]
def
postprocessu
(
self
,
u
:
sampList
,
overwrite
:
bool
=
False
):
self
.
setsample
(
u
,
overwrite
)
def
postprocessuBulk
(
self
):
pass
def
solveLS
(
self
,
mu
:
paramList
=
[],
RHS
:
sampList
=
None
)
->
sampList
:
"""
Solve linear system.
Args:
mu: Parameter value.
Returns:
Solution of system.
"""
mu
=
checkParameterList
(
mu
,
self
.
HFEngine
.
npar
)[
0
]
vbMng
(
self
,
"INIT"
,
"Solving HF model for mu = {}."
.
format
(
mu
),
15
)
u
=
self
.
HFEngine
.
solve
(
mu
,
RHS
,
return_state
=
self
.
sample_state
)
vbMng
(
self
,
"DEL"
,
"Done solving HF model."
,
15
)
return
u
def
_getSampleConcurrence
(
self
,
mu
:
paramVal
,
previous
:
Np1D
)
->
sampList
:
RROMPyAssert
(
self
.
_mode
,
message
=
"Cannot add samples."
)
if
not
(
self
.
sample_state
or
self
.
HFEngine
.
isCEye
):
raise
RROMPyException
((
"Derivatives of solution with non-scalar "
"C not computable."
))
if
len
(
previous
)
>=
len
(
self
.
_derIdxs
):
self
.
_derIdxs
+=
nextDerivativeIndices
(
self
.
_derIdxs
,
len
(
self
.
scaleFactor
),
len
(
previous
)
+
1
-
len
(
self
.
_derIdxs
))
derIdx
=
self
.
_derIdxs
[
len
(
previous
)]
mu
=
checkParameter
(
mu
,
self
.
HFEngine
.
npar
)
samplesOld
=
self
.
samples
(
previous
)
RHS
=
self
.
scaleDer
(
derIdx
)
*
self
.
HFEngine
.
b
(
mu
,
derIdx
)
for
j
,
derP
in
enumerate
(
self
.
_derIdxs
[:
len
(
previous
)]):
diffP
=
[
x
-
y
for
(
x
,
y
)
in
zip
(
derIdx
,
derP
)]
if
np
.
all
([
x
>=
0
for
x
in
diffP
]):
RHS
-=
self
.
scaleDer
(
diffP
)
*
dot
(
self
.
HFEngine
.
A
(
mu
,
diffP
),
samplesOld
[
j
])
return
self
.
solveLS
(
mu
,
RHS
=
RHS
)
def
nextSample
(
self
,
mu
:
paramVal
,
overwrite
:
bool
=
False
,
postprocess
:
bool
=
True
)
->
Np1D
:
RROMPyAssert
(
self
.
_mode
,
message
=
"Cannot add samples."
)
mu
=
checkParameter
(
mu
,
self
.
HFEngine
.
npar
)
muidxs
=
self
.
mus
.
findall
(
mu
[
0
])
if
len
(
muidxs
)
>
0
:
u
=
self
.
_getSampleConcurrence
(
mu
,
np
.
sort
(
muidxs
))
else
:
u
=
self
.
solveLS
(
mu
)
if
postprocess
:
self
.
postprocessu
(
u
,
overwrite
=
overwrite
)
else
:
self
.
setsample
(
u
,
overwrite
)
if
overwrite
:
self
.
mus
[
self
.
nsamples
]
=
mu
[
0
]
else
:
self
.
mus
.
append
(
mu
)
self
.
nsamples
+=
1
return
self
.
samples
[
self
.
nsamples
-
1
]
def
iterSample
(
self
,
mus
:
paramList
)
->
sampList
:
mus
=
checkParameterList
(
mus
,
self
.
HFEngine
.
npar
)[
0
]
vbMng
(
self
,
"INIT"
,
"Starting sampling iterations."
,
5
)
n
=
len
(
mus
)
if
n
<=
0
:
raise
RROMPyException
((
"Number of samples must be positive."
))
self
.
resetHistory
()
if
len
(
mus
.
unique
())
!=
n
:
for
j
in
range
(
n
):
vbMng
(
self
,
"MAIN"
,
"Computing sample {} / {}."
.
format
(
j
+
1
,
n
),
7
)
self
.
nextSample
(
mus
[
j
],
overwrite
=
(
j
>
0
),
postprocess
=
False
)
if
n
>
1
and
j
==
0
:
self
.
preallocateSamples
(
self
.
samples
[
0
],
mus
[
0
],
n
)
else
:
self
.
setsample
(
self
.
solveLS
(
mus
),
overwrite
=
False
)
self
.
mus
=
copy
(
mus
)
self
.
nsamples
=
n
self
.
postprocessuBulk
()
vbMng
(
self
,
"DEL"
,
"Finished sampling iterations."
,
5
)
return
self
.
samples
def
plotSamples
(
self
,
warpings
:
List
[
List
[
callable
]]
=
None
,
name
:
str
=
"u"
,
**
kwargs
)
->
Tuple
[
List
[
FigHandle
],
List
[
str
]]:
"""
Do some nice plots of the samples.
Args:
warpings(optional): Domain warping functions.
name(optional): Name to be shown as title of the plots. Defaults to
'u'.
Returns:
Output filenames and figure handles.
"""
if
warpings
is
None
:
warpings
=
[
None
]
*
self
.
nsamples
figs
=
[
None
]
*
self
.
nsamples
filesOut
=
[
None
]
*
self
.
nsamples
for
j
in
range
(
self
.
nsamples
):
pltOut
=
self
.
HFEngine
.
plot
(
self
.
samples
[
j
],
warpings
[
j
],
self
.
sample_state
,
"{}_{}"
.
format
(
name
,
j
),
**
kwargs
)
if
isinstance
(
pltOut
,
(
tuple
,)):
figs
[
j
],
filesOut
[
j
]
=
pltOut
else
:
figs
[
j
]
=
pltOut
if
filesOut
[
0
]
is
None
:
return
figs
return
figs
,
filesOut
def
outParaviewSamples
(
self
,
warpings
:
List
[
List
[
callable
]]
=
None
,
name
:
str
=
"u"
,
filename
:
str
=
"out"
,
times
:
Np1D
=
None
,
**
kwargs
)
->
List
[
str
]:
"""
Output samples to ParaView file.
Args:
warpings(optional): Domain warping functions.
name(optional): Base name to be used for data output.
filename(optional): Name of output file.
times(optional): Timestamps.
Returns:
Output filenames.
"""
if
warpings
is
None
:
warpings
=
[
None
]
*
self
.
nsamples
if
times
is
None
:
times
=
[
0.
]
*
self
.
nsamples
filesOut
=
[
None
]
*
self
.
nsamples
for
j
in
range
(
self
.
nsamples
):
filesOut
[
j
]
=
self
.
HFEngine
.
outParaview
(
self
.
samples
[
j
],
warpings
[
j
],
self
.
sample_state
,
"{}_{}"
.
format
(
name
,
j
),
"{}_{}"
.
format
(
filename
,
j
),
times
[
j
],
**
kwargs
)
if
filesOut
[
0
]
is
None
:
return
None
return
filesOut
def
outParaviewTimeDomainSamples
(
self
,
omegas
:
Np1D
=
None
,
warpings
:
List
[
List
[
callable
]]
=
None
,
timeFinal
:
Np1D
=
None
,
periodResolution
:
List
[
int
]
=
20
,
name
:
str
=
"u"
,
filename
:
str
=
"out"
,
**
kwargs
)
->
List
[
str
]:
"""
Output samples to ParaView file, converted to time domain.
Args:
omegas(optional): frequencies.
timeFinal(optional): final time of simulation.
periodResolution(optional): number of time steps per period.
name(optional): Base name to be used for data output.
filename(optional): Name of output file.
Returns:
Output filename.
"""
if
omegas
is
None
:
omegas
=
np
.
real
(
self
.
mus
)
if
warpings
is
None
:
warpings
=
[
None
]
*
self
.
nsamples
if
not
isinstance
(
timeFinal
,
(
list
,
tuple
,)):
timeFinal
=
[
timeFinal
]
*
self
.
nsamples
if
not
isinstance
(
periodResolution
,
(
list
,
tuple
,)):
periodResolution
=
[
periodResolution
]
*
self
.
nsamples
filesOut
=
[
None
]
*
self
.
nsamples
for
j
in
range
(
self
.
nsamples
):
filesOut
[
j
]
=
self
.
HFEngine
.
outParaviewTimeDomain
(
self
.
samples
[
j
],
omegas
[
j
],
warpings
[
j
],
self
.
sample_state
,
timeFinal
[
j
],
periodResolution
[
j
],
"{}_{}"
.
format
(
name
,
j
),
"{}_{}"
.
format
(
filename
,
j
),
**
kwargs
)
if
filesOut
[
0
]
is
None
:
return
None
return
filesOut
Event Timeline
Log In to Comment