Page MenuHomec4science

sampling_engine_base_pivoted.py
No OneTemporary

File Metadata

Created
Thu, May 9, 06:59

sampling_engine_base_pivoted.py

# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see <http://www.gnu.org/licenses/>.
#
import numpy as np
from rrompy.utilities.base.types import (Np1D, HFEng, List, ListAny, strLst,
paramVal, paramList, sampList)
from rrompy.utilities.base import verbosityManager as vbMng
from rrompy.utilities.exception_manager import RROMPyWarning
from rrompy.parameter import (emptyParameterList, checkParameter,
checkParameterList)
from rrompy.sampling import emptySampleList
from .sampling_engine_base import SamplingEngineBase
__all__ = ['SamplingEngineBasePivoted']
class SamplingEngineBasePivoted(SamplingEngineBase):
"""HERE"""
def __init__(self, HFEngine:HFEng, directionPivot:ListAny,
verbosity : int = 10, timestamp : bool = True,
allowRepeatedSamples : bool = True):
super().__init__(HFEngine, verbosity, timestamp)
self.directionPivot = directionPivot
self.HFEngineMarginalized = None
self.resetHistory()
@property
def directionMarginal(self):
return tuple([x for x in range(self.HFEngine.npar) \
if x not in self.directionPivot])
@property
def nPivot(self):
return len(self.directionPivot)
@property
def nMarginal(self):
return len(self.directionMarginal)
@property
def nsamplesTot(self):
return np.sum(self.nsamples)
def resetHistory(self, j : int = 1):
self.samples = [emptySampleList() for _ in range(j)]
self.nsamples = [0] * j
self.mus = [emptyParameterList() for _ in range(j)]
self._derIdxs = [[] for _ in range(j)]
def popSample(self, j:int):
if hasattr(self, "nsamples") and self.nsamples[j] > 1:
if self.samples[j].shape[1] > self.nsamples[j]:
RROMPyWarning(("More than 'nsamples' memory allocated for "
"samples. Popping empty sample column."))
self.nsamples[j] += 1
self.nsamples[j] -= 1
self.samples[j].pop()
self.mus[j].pop()
else:
self.resetHistory()
def preallocateSamples(self, u:sampList, mu:paramVal, n:int, j:int):
self.samples[j].reset((u.shape[0], n), u.dtype)
self.samples[j][0] = u
mu = checkParameter(mu, self.nPivot)
self.mus[j].reset((n, self.nPivot))
self.mus[j][0] = mu[0]
def coalesceSamples(self):
self.samplesCoalesced = emptySampleList()
self.samplesCoalesced.reset((self.samples[0].shape[0],
np.sum([samp.shape[1] \
for samp in self.samples])),
self.samples[0].dtype)
run_idx = 0
for samp in self.samples:
slen = samp.shape[1]
self.samplesCoalesced.data[:, run_idx : run_idx + slen] = samp.data
run_idx += slen
def solveLS(self, mu : paramList = [], RHS : sampList = None) -> sampList:
"""
Solve linear system.
Args:
mu: Parameter value.
Returns:
Solution of system.
"""
mu = checkParameterList(mu, self.nPivot)[0]
vbMng(self, "INIT",
("Solving HF model for muPivot = {} and muMarginal = "
"{}.").format(mu, self.HFEngineMarginalized.muFixed), 15)
u = self.HFEngineMarginalized.solve(mu, RHS)
vbMng(self, "DEL", "Done solving HF model.", 15)
return u
def plotSamples(self, warping : List[callable] = None, name : str = "u",
save : str = None, what : strLst = 'all',
saveFormat : str = "eps", saveDPI : int = 100,
show : bool = True, plotArgs : dict = {}, **figspecs):
"""
Do some nice plots of the samples.
Args:
warping(optional): Domain warping functions.
name(optional): Name to be shown as title of the plots. Defaults to
'u'.
save(optional): Where to save plot(s). Defaults to None, i.e. no
saving.
what(optional): Which plots to do. If list, can contain 'ABS',
'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard 'ALL'.
Defaults to 'ALL'.
saveFormat(optional): Format for saved plot(s). Defaults to "eps".
saveDPI(optional): DPI for saved plot(s). Defaults to 100.
show(optional): Whether to show figure. Defaults to True.
plotArgs(optional): Optional arguments for fen/pyplot.
figspecs(optional key args): Optional arguments for matplotlib
figure creation.
"""
for i in range(len(self.nsamples)):
for j in range(self.nsamples[i]):
self.HFEngine.plot(self.samples[i][j], warping,
"{}_{}_{}".format(name, i, j), save, what,
saveFormat, saveDPI, show, plotArgs,
**figspecs)
def outParaviewSamples(self, name : str = "u", folders : bool = True,
filename : str = "out", times : Np1D = None,
what : strLst = 'all', forceNewFile : bool = True,
filePW = None):
"""
Output samples to ParaView file.
Args:
name(optional): Base name to be used for data output.
folders(optional): Whether to split output in folders.
filename(optional): Name of output file.
times(optional): Timestamps.
what(optional): Which plots to do. If list, can contain 'MESH',
'ABS', 'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard
'ALL'. Defaults to 'ALL'.
forceNewFile(optional): Whether to create new output file.
filePW(optional): Fenics File entity (for time series).
"""
if times is None: times = [[0.] * self.nsamples[i] \
for i in range(len(self.nsamples))]
for i in range(len(self.nsamples)):
for j in range(self.nsamples[i]):
self.HFEngine.outParaview(self.samples[i][j],
name = "{}_{}_{}".format(name, i, j),
filename = "{}_{}_{}".format(filename,
i, j),
time = times[i][j], what = what,
forceNewFile = forceNewFile,
folder = folders, filePW = filePW)
def outParaviewTimeDomainSamples(self, omegas : Np1D = None,
timeFinal : Np1D = None,
periodResolution : int = 20,
name : str = "u", folders : bool = True,
filename : str = "out",
forceNewFile : bool = True):
"""
Output samples to ParaView file, converted to time domain.
Args:
omegas(optional): frequencies.
timeFinal(optional): final time of simulation.
periodResolution(optional): number of time steps per period.
name(optional): Base name to be used for data output.
folders(optional): Whether to split output in folders.
filename(optional): Name of output file.
forceNewFile(optional): Whether to create new output file.
"""
if omegas is None: omegas = [[np.real(self.mus[i])] \
for i in range(len(self.nsamples))]
if not isinstance(timeFinal, (list, tuple,)):
timeFinal = [[timeFinal] * self.nsamples[i] \
for i in range(len(self.nsamples))]
for i in range(len(self.nsamples)):
for j in range(self.nsamples[i]):
self.HFEngine.outParaviewTimeDomain(self.samples[i][j],
omega = omegas[i][j],
timeFinal = timeFinal[i][j],
periodResolution = periodResolution,
name = "{}_{}_{}".format(name, i, j),
filename = "{}_{}_{}".format(filename,
i, j),
forceNewFile = forceNewFile,
folder = folders)

Event Timeline