Page MenuHomec4science

sampling_engine_standard.py
No OneTemporary

File Metadata

Created
Sat, May 4, 03:42

sampling_engine_standard.py

# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see <http://www.gnu.org/licenses/>.
#
from numbers import Number
from copy import deepcopy as copy
import numpy as np
from warnings import catch_warnings
from rrompy.utilities.base.types import (Np1D, Np2D, HFEng, List, paramVal,
Any, paramList, sampList, Tuple,
TupleAny, DictAny, FigHandle)
from rrompy.utilities.base.data_structures import getNewFilename
from rrompy.utilities.base import verbosityManager as vbMng
from rrompy.utilities.exception_manager import (RROMPyWarning, RROMPyException,
RROMPyAssert, RROMPy_READY,
RROMPy_FRAGILE)
from rrompy.utilities.numerical.hash_derivative import nextDerivativeIndices
from rrompy.utilities.base.pickle_utilities import pickleDump, pickleLoad
from rrompy.parameter import (emptyParameterList, checkParameter,
checkParameterList)
from rrompy.sampling import sampleList, emptySampleList
from rrompy.utilities.parallel import bcast, masterCore
__all__ = ['SamplingEngineStandard']
class SamplingEngineStandard:
def __init__(self, HFEngine:HFEng, sample_state : bool = False,
verbosity : int = 10, timestamp : bool = True,
scaleFactor : Np1D = None):
self.sample_state = sample_state
self.verbosity = verbosity
self.timestamp = timestamp
vbMng(self, "INIT",
"Initializing sampling engine of type {}.".format(self.name()),
10)
self.HFEngine = HFEngine
vbMng(self, "DEL", "Done initializing sampling engine.", 10)
self.scaleFactor = scaleFactor
def name(self) -> str:
return self.__class__.__name__
def __str__(self) -> str:
return self.name()
def __repr__(self) -> str:
return self.__str__() + " at " + hex(id(self))
@property
def HFEngine(self):
"""Value of HFEngine. Its assignment resets history."""
return self._HFEngine
@HFEngine.setter
def HFEngine(self, HFEngine):
self._HFEngine = HFEngine
self.resetHistory()
@property
def scaleFactor(self):
"""Value of scaleFactor."""
return self._scaleFactor
@scaleFactor.setter
def scaleFactor(self, scaleFactor):
if scaleFactor is None: scaleFactor = 1.
if not hasattr(scaleFactor, "__len__"): scaleFactor = [scaleFactor]
self._scaleFactor = scaleFactor
def scaleDer(self, derIdx:Np1D):
if not isinstance(self.scaleFactor, Number):
RROMPyAssert(len(derIdx), len(self.scaleFactor),
"Number of derivative indices")
with catch_warnings(record = True) as w:
res = np.prod(np.power(self.scaleFactor, derIdx))
if len(w) == 0: return res
raise RROMPyException(("Error in computing derivative scaling "
"factor: {}".format(str(w[-1].message))))
@property
def feature_keys(self) -> TupleAny:
return ["mus", "samples", "nsamples", "_derIdxs"]
@property
def feature_vals(self) -> DictAny:
return {"mus":self.mus, "samples":self.samples,
"nsamples":self.nsamples, "_derIdxs":self._derIdxs,
"_scaleFactor":self.scaleFactor}
def _mergeFeature(self, name:str, value:Any):
if name in ["mus", "samples"]:
getattr(self, name).append(value)
elif name == "nsamples":
self.nsamples += value
elif name == "_derIdxs":
self._derIdxs += value
else:
raise RROMPyException(("Invalid key {} in sampling engine "
"merge.".format(name)))
def store(self, filenameBase : str = "sampling_engine",
forceNewFile : bool = True, local : bool = False) -> str:
"""Store sampling engine to file."""
filename = None
if masterCore():
vbMng(self, "INIT", "Storing sampling engine to file.", 20)
if forceNewFile:
filename = getNewFilename(filenameBase, "pkl")
else:
filename = "{}.pkl".format(filenameBase)
pickleDump(self.feature_vals, filename)
vbMng(self, "DEL", "Done storing engine.", 20)
if local: return
filename = bcast(filename)
return filename
def load(self, filename:str, merge : bool = False):
"""Load sampling engine from file."""
if isinstance(filename, (list, tuple,)):
self.load(filename[0], merge)
for filen in filename[1 :]: self.load(filen, True)
return
vbMng(self, "INIT", "Loading stored sampling engine from file.", 20)
datadict = pickleLoad(filename)
for key in datadict:
if key in self.feature_keys:
if merge and key != "_scaleFactor":
self._mergeFeature(key, datadict[key])
else:
setattr(self, key, datadict[key])
self._mode = RROMPy_FRAGILE
vbMng(self, "DEL", "Done loading stored engine.", 20)
@property
def projectionMatrix(self) -> Np2D:
return self.samples.data
def resetHistory(self):
self._mode = RROMPy_READY
self.samples = emptySampleList()
self.nsamples = 0
self.mus = emptyParameterList()
self._derIdxs = []
def setsample(self, u:sampList, overwrite : bool = False):
if overwrite:
self.samples[self.nsamples] = u
else:
if self.nsamples == 0:
self.samples = sampleList(u)
else:
self.samples.append(u)
def popSample(self):
if hasattr(self, "nsamples") and self.nsamples > 1:
if self.samples.shape[1] > self.nsamples:
RROMPyWarning(("More than 'nsamples' memory allocated for "
"samples. Popping empty sample column."))
self.nsamples += 1
self.nsamples -= 1
self.samples.pop()
self.mus.pop()
else:
self.resetHistory()
def preallocateSamples(self, u:sampList, mu:paramVal, n:int):
self._mode = RROMPy_READY
self.samples.reset((u.shape[0], n), u.dtype)
self.samples[0] = u
mu = checkParameter(mu, self.HFEngine.npar)
self.mus.reset((n, self.HFEngine.npar))
self.mus[0] = mu[0]
def postprocessu(self, u:sampList, overwrite : bool = False):
self.setsample(u, overwrite)
def postprocessuBulk(self):
pass
def solveLS(self, mu : paramList = [], RHS : sampList = None) -> sampList:
"""
Solve linear system.
Args:
mu: Parameter value.
Returns:
Solution of system.
"""
mu = checkParameterList(mu, self.HFEngine.npar)
vbMng(self, "INIT", "Solving HF model for mu = {}.".format(mu), 15)
u = self.HFEngine.solve(mu, RHS, return_state = self.sample_state)
vbMng(self, "DEL", "Done solving HF model.", 15)
return u
def _getSampleConcurrence(self, mu:paramVal, previous:Np1D) -> sampList:
RROMPyAssert(self._mode, message = "Cannot add samples.")
if not (self.sample_state or self.HFEngine.isCEye):
raise RROMPyException(("Derivatives of solution with non-scalar "
"C not computable."))
from rrompy.utilities.numerical import dot
if len(previous) >= len(self._derIdxs):
self._derIdxs += nextDerivativeIndices(self._derIdxs,
len(self.scaleFactor),
len(previous) + 1 - len(self._derIdxs))
derIdx = self._derIdxs[len(previous)]
mu = checkParameter(mu, self.HFEngine.npar)
samplesOld = self.samples(previous)
RHS = self.scaleDer(derIdx) * self.HFEngine.b(mu, derIdx)
for j, derP in enumerate(self._derIdxs[: len(previous)]):
diffP = [x - y for (x, y) in zip(derIdx, derP)]
if np.all([x >= 0 for x in diffP]):
RHS -= self.scaleDer(diffP) * dot(self.HFEngine.A(mu, diffP),
samplesOld[j])
return self.solveLS(mu, RHS = RHS)
def nextSample(self, mu:paramVal, overwrite : bool = False,
postprocess : bool = True) -> Np1D:
RROMPyAssert(self._mode, message = "Cannot add samples.")
mu = checkParameter(mu, self.HFEngine.npar)
muidxs = self.mus.findall(mu[0])
if len(muidxs) > 0:
u = self._getSampleConcurrence(mu, np.sort(muidxs))
else:
u = self.solveLS(mu)
if postprocess:
self.postprocessu(u, overwrite = overwrite)
else:
self.setsample(u, overwrite)
if overwrite:
self.mus[self.nsamples] = mu[0]
else:
self.mus.append(mu)
self.nsamples += 1
return self.samples[self.nsamples - 1]
def iterSample(self, mus:paramList) -> sampList:
mus = checkParameterList(mus, self.HFEngine.npar)
vbMng(self, "INIT", "Starting sampling iterations.", 5)
n = len(mus)
if n <= 0:
raise RROMPyException(("Number of samples must be positive."))
self.resetHistory()
if len(mus.unique()) != n:
for j in range(n):
vbMng(self, "MAIN",
"Computing sample {} / {}.".format(j + 1, n), 7)
self.nextSample(mus[j], overwrite = (j > 0),
postprocess = False)
if n > 1 and j == 0:
self.preallocateSamples(self.samples[0], mus[0], n)
else:
self.setsample(self.solveLS(mus), overwrite = False)
self.mus = copy(mus)
self.nsamples = n
self.postprocessuBulk()
vbMng(self, "DEL", "Finished sampling iterations.", 5)
return self.samples
def plotSamples(self, warpings : List[List[callable]] = None,
name : str = "u",
**kwargs) -> Tuple[List[FigHandle], List[str]]:
"""
Do some nice plots of the samples.
Args:
warpings(optional): Domain warping functions.
name(optional): Name to be shown as title of the plots. Defaults to
'u'.
Returns:
Output filenames and figure handles.
"""
if warpings is None: warpings = [None] * self.nsamples
figs = [None] * self.nsamples
filesOut = [None] * self.nsamples
for j in range(self.nsamples):
pltOut = self.HFEngine.plot(self.samples[j], warpings[j],
self.sample_state,
"{}_{}".format(name, j), **kwargs)
if isinstance(pltOut, (tuple,)):
figs[j], filesOut[j] = pltOut
else:
figs[j] = pltOut
if filesOut[0] is None: return figs
return figs, filesOut
def outParaviewSamples(self, warpings : List[List[callable]] = None,
name : str = "u", filename : str = "out",
times : Np1D = None, **kwargs) -> List[str]:
"""
Output samples to ParaView file.
Args:
warpings(optional): Domain warping functions.
name(optional): Base name to be used for data output.
filename(optional): Name of output file.
times(optional): Timestamps.
Returns:
Output filenames.
"""
if warpings is None: warpings = [None] * self.nsamples
if times is None: times = [0.] * self.nsamples
filesOut = [None] * self.nsamples
for j in range(self.nsamples):
filesOut[j] = self.HFEngine.outParaview(
self.samples[j], warpings[j], self.sample_state,
"{}_{}".format(name, j), "{}_{}".format(filename, j),
times[j], **kwargs)
if filesOut[0] is None: return None
return filesOut
def outParaviewTimeDomainSamples(self, omegas : Np1D = None,
warpings : List[List[callable]] = None,
timeFinal : Np1D = None,
periodResolution : List[int] = 20,
name : str = "u", filename : str = "out",
**kwargs) -> List[str]:
"""
Output samples to ParaView file, converted to time domain.
Args:
omegas(optional): frequencies.
timeFinal(optional): final time of simulation.
periodResolution(optional): number of time steps per period.
name(optional): Base name to be used for data output.
filename(optional): Name of output file.
Returns:
Output filename.
"""
if omegas is None: omegas = np.real(self.mus)
if warpings is None: warpings = [None] * self.nsamples
if not isinstance(timeFinal, (list, tuple,)):
timeFinal = [timeFinal] * self.nsamples
if not isinstance(periodResolution, (list, tuple,)):
periodResolution = [periodResolution] * self.nsamples
filesOut = [None] * self.nsamples
for j in range(self.nsamples):
filesOut[j] = self.HFEngine.outParaviewTimeDomain(
self.samples[j], omegas[j], warpings[j],
self.sample_state, timeFinal[j],
periodResolution[j], "{}_{}".format(name, j),
"{}_{}".format(filename, j), **kwargs)
if filesOut[0] is None: return None
return filesOut

Event Timeline