Page MenuHomec4science

sampling_engine_base_pivoted.py
No OneTemporary

File Metadata

Created
Fri, May 10, 03:01

sampling_engine_base_pivoted.py

# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see <http://www.gnu.org/licenses/>.
#
import numpy as np
from rrompy.utilities.base.types import (Np1D, HFEng, List, ListAny, paramVal,
paramList, sampList, Tuple, FigHandle)
from rrompy.utilities.base import verbosityManager as vbMng
from rrompy.utilities.exception_manager import RROMPyWarning
from rrompy.parameter import (emptyParameterList, checkParameter,
checkParameterList)
from rrompy.sampling import emptySampleList
from .sampling_engine_base import SamplingEngineBase
__all__ = ['SamplingEngineBasePivoted']
class SamplingEngineBasePivoted(SamplingEngineBase):
"""HERE"""
def __init__(self, HFEngine:HFEng, directionPivot:ListAny,
*args, **kwargs):
super().__init__(HFEngine, *args, **kwargs)
self.directionPivot = directionPivot
self.HFEngineMarginalized = None
self.resetHistory()
@property
def directionMarginal(self):
return tuple([x for x in range(self.HFEngine.npar) \
if x not in self.directionPivot])
@property
def nPivot(self):
return len(self.directionPivot)
@property
def nMarginal(self):
return len(self.directionMarginal)
@property
def nsamplesTot(self):
return np.sum(self.nsamples)
def resetHistory(self, j : int = 1):
self.samples = [emptySampleList() for _ in range(j)]
self.nsamples = [0] * j
self.mus = [emptyParameterList() for _ in range(j)]
self._derIdxs = [[] for _ in range(j)]
def popSample(self, j:int):
if hasattr(self, "nsamples") and self.nsamples[j] > 1:
if self.samples[j].shape[1] > self.nsamples[j]:
RROMPyWarning(("More than 'nsamples' memory allocated for "
"samples. Popping empty sample column."))
self.nsamples[j] += 1
self.nsamples[j] -= 1
self.samples[j].pop()
self.mus[j].pop()
else:
self.resetHistory()
def preallocateSamples(self, u:sampList, mu:paramVal, n:int, j:int):
self.samples[j].reset((u.shape[0], n), u.dtype)
self.samples[j][0] = u
mu = checkParameter(mu, self.nPivot)
self.mus[j].reset((n, self.nPivot))
self.mus[j][0] = mu[0]
def coalesceSamples(self):
vbMng(self, "INIT", "Coalescing samples.", 7)
self.samplesCoalesced = emptySampleList()
self.samplesCoalesced.reset((self.samples[0].shape[0],
np.sum([samp.shape[1] \
for samp in self.samples])),
self.samples[0].dtype)
run_idx = 0
for samp in self.samples:
slen = samp.shape[1]
self.samplesCoalesced.data[:, run_idx : run_idx + slen] = samp.data
run_idx += slen
vbMng(self, "DEL", "Done coalescing samples.", 7)
def solveLS(self, mu : paramList = [], RHS : sampList = None) -> sampList:
"""
Solve linear system.
Args:
mu: Parameter value.
Returns:
Solution of system.
"""
mu = checkParameterList(mu, self.nPivot)[0]
vbMng(self, "INIT",
("Solving HF model for muPivot = {} and muMarginal = "
"{}.").format(mu, self.HFEngineMarginalized.muFixed), 15)
u = self.HFEngineMarginalized.solve(mu, RHS,
return_state = self.sample_state,
verbose = (self.verbosity >= 20))
vbMng(self, "DEL", "Done solving HF model.", 15)
return u
def plotSamples(self, warpings : List[List[List[callable]]] = None,
name : str = "u",
**kwargs) -> Tuple[List[List[FigHandle]], List[List[str]]]:
"""
Do some nice plots of the samples.
Args:
warpings(optional): Domain warping functions.
name(optional): Name to be shown as title of the plots. Defaults to
'u'.
Returns:
Output filenames and figure handles.
"""
if warpings is None: warpings = [[None] * self.nsamples[i] \
for i in range(len(self.nsamples))]
figs = []
filesOut = []
for i in range(len(self.nsamples)):
figsi = [None] * self.nsamples[i]
filesOuti = [None] * self.nsamples[i]
for j in range(self.nsamples[i]):
pltOut = self.HFEngine.plot(self.samples[i][j], warpings[i][j],
self.sample_state,
"{}_{}_{}".format(name, i, j),
**kwargs)
if isinstance(pltOut, (tuple,)):
figsi[j], filesOuti[j] = pltOut
else:
figsi[j] = pltOut
figs += [figsi]
filesOut += [filesOuti]
if filesOut[0][0] is None: return figs
return figs, filesOut
def outParaviewSamples(self, warpings : List[List[List[callable]]] = None,
name : str = "u", filename : str = "out",
times : List[Np1D] = None,
**kwargs) -> List[List[str]]:
"""
Output samples to ParaView file.
Args:
warpings(optional): Domain warping functions.
name(optional): Base name to be used for data output.
filename(optional): Name of output file.
times(optional): Timestamps.
Returns:
Output filenames.
"""
if warpings is None: warpings = [[None] * self.nsamples[i] \
for i in range(len(self.nsamples))]
if times is None: times = [[0.] * self.nsamples[i] \
for i in range(len(self.nsamples))]
filesOut = []
for i in range(len(self.nsamples)):
filesOuti = [None] * self.nsamples[i]
for j in range(self.nsamples[i]):
filesOuti[j] = self.HFEngine.outParaview(self.samples[i][j],
warpings[i][j], self.sample_state,
"{}_{}_{}".format(name, i, j),
"{}_{}_{}".format(filename, i, j),
times[i][j], **kwargs)
filesOut += [filesOuti]
if filesOut[0][0] is None: return None
return filesOut
def outParaviewTimeDomainSamples(self, omegas : Np1D = None,
warpings : List[List[List[callable]]] = None,
timeFinal : Np1D = None,
periodResolution : List[List[int]] = 20,
name : str = "u", filename : str = "out",
**kwargs) -> List[List[str]]:
"""
Output samples to ParaView file, converted to time domain.
Args:
omegas(optional): frequencies.
warpings(optional): Domain warping functions.
timeFinal(optional): final time of simulation.
periodResolution(optional): number of time steps per period.
name(optional): Base name to be used for data output.
filename(optional): Name of output file.
Returns:
Output filenames.
"""
if omegas is None: omegas = [np.real(self.mus[i]) \
for i in range(len(self.nsamples))]
if warpings is None: warpings = [[None] * self.nsamples[i] \
for i in range(len(self.nsamples))]
if not isinstance(timeFinal, (list, tuple,)):
timeFinal = [[timeFinal] * self.nsamples[i] \
for i in range(len(self.nsamples))]
if not isinstance(periodResolution, (list, tuple,)):
periodResolution = [[periodResolution] * self.nsamples[i] \
for i in range(len(self.nsamples))]
filesOut = []
for i in range(len(self.nsamples)):
filesOuti = [None] * self.nsamples[i]
for j in range(self.nsamples[i]):
filesOuti[j] = self.HFEngine.outParaviewTimeDomain(
self.samples[i][j], omegas[i][j],
warpings[i][j], self.sample_state,
timeFinal[i][j],
periodResolution[i][j],
"{}_{}_{}".format(name, i, j),
"{}_{}_{}".format(filename, i, j),
**kwargs)
filesOut += [filesOuti]
if filesOut[0][0] is None: return None
return filesOut

Event Timeline