diff --git a/rrompy/sampling/base/pod_engine.py b/rrompy/sampling/base/pod_engine.py
index a81aed1..1df7cf3 100644
--- a/rrompy/sampling/base/pod_engine.py
+++ b/rrompy/sampling/base/pod_engine.py
@@ -1,149 +1,149 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
import numpy as np
from copy import deepcopy as copy
-from rrompy.utilities.base.types import Np1D, Tuple, HFEng, sampList
+from rrompy.utilities.base.types import Np1D, Np2D, Tuple, HFEng, sampList
from rrompy.sampling import sampleList
from rrompy.utilities.exception_manager import RROMPyException
__all__ = ['PODEngine']
class PODEngine:
"""
POD engine for general matrix orthogonalization.
"""
def __init__(self, HFEngine:HFEng):
self.HFEngine = HFEngine
def name(self) -> str:
return self.__class__.__name__
def __str__(self) -> str:
return self.name()
def __repr__(self) -> str:
return self.__str__() + " at " + hex(id(self))
def GS(self, a:Np1D, Q:sampList, n : int = None,
aA:Np1D = None, QA:sampList = None) -> Tuple[Np1D, Np1D, Np1D]:
"""
Compute 1 Gram-Schmidt step with given projector.
Args:
a: vector to be projected;
Q: orthogonal projection matrix;
n: number of columns of Q to be considered;
aA: augmented components of vector to be projected;
QA: augmented components of projection matrix.
Returns:
Resulting normalized vector, coefficients of a wrt the updated
basis.
"""
if n is None:
n = Q.shape[1]
if aA is None != QA is None:
raise RROMPyException(("Either both or none of augmented "
"components must be provided."))
r = np.zeros((n + 1,), dtype = a.dtype)
if n > 0:
Q = Q[: n]
for j in range(2): # twice is enough!
nu = self.HFEngine.innerProduct(a, Q)
a = a - Q.dot(nu)
if aA is not None:
aA = aA - QA.dot(nu)
r[:-1] = r[:-1] + nu.flatten()
r[-1] = self.HFEngine.norm(a)
if np.isclose(np.abs(r[-1]), 0.):
r[-1] = 1.
a = a / r[-1]
if aA is not None:
aA = aA / r[-1]
return a, r, aA
def QRGramSchmidt(self, A:sampList,
- only_R : bool = False) -> Tuple[sampList, Np1D]:
+ only_R : bool = False) -> Tuple[sampList, Np2D]:
"""
Compute QR decomposition of a matrix through Gram-Schmidt method.
Args:
A: matrix to be decomposed;
only_R(optional): whether to skip reconstruction of Q; defaults to
False.
Returns:
Resulting orthogonal and upper-triangular factors.
"""
N = A.shape[1]
Q = copy(A)
R = np.zeros((N, N), dtype = A.dtype)
for k in range(N):
Q[k], R[: k + 1, k], _ = self.GS(A[k], Q, k)
if only_R:
return R
return Q, R
def QRHouseholder(self, A:sampList, Q0 : sampList = None,
- only_R : bool = False) -> Tuple[sampList, Np1D]:
+ only_R : bool = False) -> Tuple[sampList, Np2D]:
"""
Compute QR decomposition of a matrix through Householder method.
Args:
A: matrix to be decomposed;
Q0(optional): initial orthogonal guess for Q; defaults to random;
only_R(optional): whether to skip reconstruction of Q; defaults to
False.
Returns:
Resulting (orthogonal and )upper-triangular factor(s).
"""
N = A.shape[1]
B = copy(A)
V = copy(A)
R = np.zeros((N, N), dtype = A.dtype)
if Q0 is None:
Q = sampleList(np.zeros(A.shape, dtype = A.dtype)
+ np.random.randn(*(A.shape)))
else:
Q = copy(Q0)
for k in range(N):
if Q0 is None:
Q[k], _, _ = self.GS(Q[k], Q, k)
a = B[k]
R[k, k] = self.HFEngine.norm(a)
alpha = self.HFEngine.innerProduct(a, Q[k])
if np.isclose(np.abs(alpha), 0.): s = 1.
else: s = - alpha / np.abs(alpha)
Q[k] = s * Q[k]
V[k], _, _ = self.GS(R[k, k] * Q[k] - a, Q, k)
J = np.arange(k + 1, N)
vtB = self.HFEngine.innerProduct(B[J], V[k])
- B[J] = B[J] - 2 * np.outer(V[k], vtB)
+ B.data[:, J] -= 2 * np.outer(V[k], vtB)
R[k, J] = self.HFEngine.innerProduct(B[J], Q[k])
- B[J] = B[J] - np.outer(Q[k], R[k, J])
+ B.data[:, J] -= np.outer(Q[k], R[k, J])
if only_R:
return R
for k in range(N - 1, -1, -1):
- J = np.arange(k, N)
+ J = list(range(k, N))
vtQ = self.HFEngine.innerProduct(Q[J], V[k])
- Q[J] = Q[J] - 2 * np.outer(V[k], vtQ)
+ Q.data[:, J] -= 2 * np.outer(V[k], vtQ)
return Q, R
diff --git a/rrompy/sampling/base/sampling_engine_base.py b/rrompy/sampling/base/sampling_engine_base.py
index 292204f..4619855 100644
--- a/rrompy/sampling/base/sampling_engine_base.py
+++ b/rrompy/sampling/base/sampling_engine_base.py
@@ -1,194 +1,195 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
import numpy as np
from rrompy.utilities.base.types import (Np1D, HFEng, strLst, paramVal,
paramList, sampList)
from rrompy.utilities.base import verbosityDepth
from rrompy.utilities.exception_manager import RROMPyWarning
from rrompy.parameter import (emptyParameterList, checkParameter,
checkParameterList)
from rrompy.sampling import emptySampleList
__all__ = ['SamplingEngineBase']
class SamplingEngineBase:
"""HERE"""
def __init__(self, HFEngine:HFEng, verbosity : int = 10,
- timestamp : bool = True):
+ timestamp : bool = True, allowRepeatedSamples : bool = True):
self.verbosity = verbosity
self.timestamp = timestamp
+ self.allowRepeatedSamples = allowRepeatedSamples
if self.verbosity >= 10:
verbosityDepth("INIT",
"Initializing sampling engine of type {}.".format(
self.name()),
timestamp = self.timestamp)
self.HFEngine = HFEngine
if self.verbosity >= 10:
verbosityDepth("DEL", "Done initializing sampling engine.",
timestamp = self.timestamp)
def name(self) -> str:
return self.__class__.__name__
def __str__(self) -> str:
return self.name()
def __repr__(self) -> str:
return self.__str__() + " at " + hex(id(self))
def resetHistory(self):
self.samples = emptySampleList()
self.nsamples = 0
self.mus = emptyParameterList()
self._derIdxs = []
def popSample(self):
if hasattr(self, "nsamples") and self.nsamples > 1:
if self.samples.shape[1] > self.nsamples:
RROMPyWarning(("More than 'nsamples' memory allocated for "
"samples. Popping empty sample column."))
self.nsamples += 1
self.nsamples -= 1
self.samples.pop()
self.mus.pop()
else:
self.resetHistory()
def preallocateSamples(self, u:sampList, mu:paramVal, n:int):
self.samples.reset((u.shape[0], n), u.dtype)
self.samples[0] = u
mu = checkParameter(mu, self.HFEngine.npar)
self.mus.reset((n, self.HFEngine.npar))
self.mus[0] = mu[0]
@property
def HFEngine(self):
"""Value of HFEngine. Its assignment resets history."""
return self._HFEngine
@HFEngine.setter
def HFEngine(self, HFEngine):
self._HFEngine = HFEngine
self.resetHistory()
def solveLS(self, mu : paramList = [], RHS : sampList = None,
homogeneized : bool = False) -> sampList:
"""
Solve linear system.
Args:
mu: Parameter value.
Returns:
Solution of system.
"""
mu, _ = checkParameterList(mu, self.HFEngine.npar)
if self.verbosity >= 15:
verbosityDepth("INIT", "Solving HF model for mu = {}.".format(mu),
timestamp = self.timestamp)
u = self.HFEngine.solve(mu, RHS, homogeneized)
if self.verbosity >= 15:
verbosityDepth("DEL", "Done solving HF model.",
timestamp = self.timestamp)
return u
def plotSamples(self, name : str = "u", save : str = None,
what : strLst = 'all', saveFormat : str = "eps",
saveDPI : int = 100, show : bool = True, **figspecs):
"""
Do some nice plots of the samples.
Args:
name(optional): Name to be shown as title of the plots. Defaults to
'u'.
save(optional): Where to save plot(s). Defaults to None, i.e. no
saving.
what(optional): Which plots to do. If list, can contain 'ABS',
'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard 'ALL'.
Defaults to 'ALL'.
saveFormat(optional): Format for saved plot(s). Defaults to "eps".
saveDPI(optional): DPI for saved plot(s). Defaults to 100.
show(optional): Whether to show figure. Defaults to True.
figspecs(optional key args): Optional arguments for matplotlib
figure creation.
"""
for j in range(self.nsamples):
self.HFEngine.plot(self.samples[j], name = "{}_{}".format(name, j),
save = save, what = what,
saveFormat = saveFormat, saveDPI = saveDPI,
show = show, **figspecs)
def outParaviewSamples(self, name : str = "u", folders : bool = True,
filename : str = "out", times : Np1D = None,
what : strLst = 'all', forceNewFile : bool = True,
filePW = None):
"""
Output samples to ParaView file.
Args:
name(optional): Base name to be used for data output.
folders(optional): Whether to split output in folders.
filename(optional): Name of output file.
times(optional): Timestamps.
what(optional): Which plots to do. If list, can contain 'MESH',
'ABS', 'PHASE', 'REAL', 'IMAG'. If str, same plus wildcard
'ALL'. Defaults to 'ALL'.
forceNewFile(optional): Whether to create new output file.
filePW(optional): Fenics File entity (for time series).
"""
if times is None: times = [0.] * self.nsamples
for j in range(self.nsamples):
self.HFEngine.outParaview(self.samples[j],
name = "{}_{}".format(name, j),
filename = "{}_{}".format(filename, j),
time = times[j], what = what,
forceNewFile = forceNewFile,
folder = folders, filePW = filePW)
def outParaviewTimeDomainSamples(self, omegas : Np1D = None,
timeFinal : Np1D = None,
periodResolution : int = 20,
name : str = "u", folders : bool = True,
filename : str = "out",
forceNewFile : bool = True):
"""
Output samples to ParaView file, converted to time domain.
Args:
omegas(optional): frequencies.
timeFinal(optional): final time of simulation.
periodResolution(optional): number of time steps per period.
name(optional): Base name to be used for data output.
folders(optional): Whether to split output in folders.
filename(optional): Name of output file.
forceNewFile(optional): Whether to create new output file.
"""
if omegas is None: omegas = np.real(self.mus)
if not isinstance(timeFinal, (list, tuple,)):
timeFinal = [timeFinal] * self.nsamples
for j in range(self.nsamples):
self.HFEngine.outParaviewTimeDomain(self.samples[j],
omega = omegas[j],
timeFinal = timeFinal[j],
periodResolution = periodResolution,
name = "{}_{}".format(name, j),
filename = "{}_{}".format(filename, j),
forceNewFile = forceNewFile,
folder = folders)
diff --git a/rrompy/sampling/linear_problem/__init__.py b/rrompy/sampling/linear_problem/__init__.py
index 8326efd..397644d 100644
--- a/rrompy/sampling/linear_problem/__init__.py
+++ b/rrompy/sampling/linear_problem/__init__.py
@@ -1,27 +1,29 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
from .sampling_engine_linear import SamplingEngineLinear
from .sampling_engine_linear_pod import SamplingEngineLinearPOD
+from .sampling_engine_linear_pod_naive import SamplingEngineLinearPODNaive
__all__ = [
'SamplingEngineLinear',
- 'SamplingEngineLinearPOD'
+ 'SamplingEngineLinearPOD',
+ 'SamplingEngineLinearPODNaive'
]
diff --git a/rrompy/sampling/linear_problem/sampling_engine_linear.py b/rrompy/sampling/linear_problem/sampling_engine_linear.py
index f0ac79c..d7a6412 100644
--- a/rrompy/sampling/linear_problem/sampling_engine_linear.py
+++ b/rrompy/sampling/linear_problem/sampling_engine_linear.py
@@ -1,105 +1,123 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
+from copy import deepcopy as copy
import numpy as np
from rrompy.sampling.base.sampling_engine_base import SamplingEngineBase
from rrompy.utilities.base.types import Np1D, paramVal, paramList, sampList
from rrompy.utilities.base import verbosityDepth
from rrompy.utilities.exception_manager import RROMPyException
from rrompy.utilities.poly_fitting.polynomial import nextDerivativeIndices
from rrompy.parameter import checkParameter, checkParameterList
from rrompy.sampling import sampleList
__all__ = ['SamplingEngineLinear']
class SamplingEngineLinear(SamplingEngineBase):
"""HERE"""
def preprocesssamples(self, idxs:Np1D) -> sampList:
if self.samples is None or len(self.samples) == 0: return
return self.samples(idxs)
def postprocessu(self, u:sampList, overwrite : bool = False) -> Np1D:
- return u
+ return copy(u)
+
+ def postprocessuBulk(self, u:sampList) -> sampList:
+ return copy(u)
+
+ def lastSampleManagement(self):
+ pass
def _getSampleConcurrence(self, mu:paramVal, previous:Np1D,
homogeneized : bool = False) -> sampList:
if len(previous) >= len(self._derIdxs):
self._derIdxs += nextDerivativeIndices(self._derIdxs,
self.HFEngine.npar,
len(previous) + 1 - len(self._derIdxs))
derIdx = self._derIdxs[len(previous)]
mu = checkParameter(mu, self.HFEngine.npar)
samplesOld = self.preprocesssamples(previous)
RHS = self.HFEngine.b(mu, derIdx, homogeneized = homogeneized)
for j, derP in enumerate(self._derIdxs[: len(previous)]):
diffP = [x - y for (x, y) in zip(derIdx, derP)]
if np.all([x >= 0 for x in diffP]):
RHS -= self.HFEngine.A(mu, diffP).dot(samplesOld[j])
return self.solveLS(mu, RHS = RHS, homogeneized = homogeneized)
def nextSample(self, mu : paramVal = [], overwrite : bool = False,
- homogeneized : bool = False) -> Np1D:
+ homogeneized : bool = False,
+ lastSample : bool = True) -> Np1D:
mu = checkParameter(mu, self.HFEngine.npar)
ns = self.nsamples
muidxs = self.mus.findall(mu[0])
if len(muidxs) > 0:
u = self._getSampleConcurrence(mu, np.sort(muidxs), homogeneized)
else:
u = self.solveLS(mu, homogeneized = homogeneized)
u = self.postprocessu(u, overwrite = overwrite)
if overwrite:
self.samples[ns] = u
self.mus[ns] = mu[0]
else:
if ns == 0:
self.samples = sampleList(u)
else:
self.samples.append(u)
self.mus.append(mu)
self.nsamples += 1
+ if lastSample: self.lastSampleManagement()
return u
def iterSample(self, mus:paramList,
homogeneized : bool = False) -> sampList:
mus, _ = checkParameterList(mus, self.HFEngine.npar)
if self.verbosity >= 5:
verbosityDepth("INIT", "Starting sampling iterations.",
timestamp = self.timestamp)
n = len(mus)
if n <= 0:
raise RROMPyException(("Number of samples must be positive."))
self.resetHistory()
- if self.verbosity >= 7:
- verbosityDepth("MAIN", "Computing sample {}/{}.".format(1, n),
- timestamp = self.timestamp)
- u = self.nextSample(mus[0], homogeneized = homogeneized)
- if n > 1:
- self.preallocateSamples(u, mus[0], n)
- for j in range(1, n):
- if self.verbosity >= 7:
- verbosityDepth("MAIN",
- "Computing sample {}/{}.".format(j + 1, n),
- timestamp = self.timestamp)
- self.nextSample(mus[j], overwrite = True,
- homogeneized = homogeneized)
+
+ if self.allowRepeatedSamples:
+ if self.verbosity >= 7:
+ verbosityDepth("MAIN", "Computing sample {}/{}.".format(1, n),
+ timestamp = self.timestamp)
+ u = self.nextSample(mus[0], homogeneized = homogeneized,
+ lastSample = (n == 1))
+ if n > 1:
+ self.preallocateSamples(u, mus[0], n)
+ for j in range(1, n):
+ if self.verbosity >= 7:
+ verbosityDepth("MAIN", ("Computing sample "
+ "{}/{}.").format(j + 1, n),
+ timestamp = self.timestamp)
+ self.nextSample(mus[j], overwrite = True,
+ homogeneized = homogeneized,
+ lastSample = (n == j + 1))
+ else:
+ self.samples = self.postprocessuBulk(self.solveLS(mus,
+ homogeneized = homogeneized))
+ self.mus = copy(mus)
+ self.nsamples = len(mus)
if self.verbosity >= 5:
verbosityDepth("DEL", "Finished sampling iterations.",
timestamp = self.timestamp)
return self.samples
diff --git a/rrompy/sampling/linear_problem/sampling_engine_linear_pod.py b/rrompy/sampling/linear_problem/sampling_engine_linear_pod.py
index 2f4a100..30b71aa 100644
--- a/rrompy/sampling/linear_problem/sampling_engine_linear_pod.py
+++ b/rrompy/sampling/linear_problem/sampling_engine_linear_pod.py
@@ -1,83 +1,73 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
-import numpy as np
+from copy import deepcopy as copy
from rrompy.sampling.base.pod_engine import PODEngine
from .sampling_engine_linear import SamplingEngineLinear
from rrompy.utilities.base.types import Np1D, paramVal, sampList
-from rrompy.utilities.base import verbosityDepth
+from rrompy.sampling import sampleList
__all__ = ['SamplingEngineLinearPOD']
class SamplingEngineLinearPOD(SamplingEngineLinear):
"""HERE"""
def resetHistory(self):
super().resetHistory()
+ self.samples_full = None
self.RPOD = None
def popSample(self):
if hasattr(self, "nsamples") and self.nsamples > 1:
self.RPOD = self.RPOD[: -1, : -1]
+ self.samples_full.pop()
super().popSample()
@property
def HFEngine(self):
"""Value of HFEngine. Its assignment resets history."""
return self._HFEngine
@HFEngine.setter
def HFEngine(self, HFEngine):
self._HFEngine = HFEngine
self.resetHistory()
self.PODEngine = PODEngine(self._HFEngine)
def preprocesssamples(self, idxs:Np1D) -> sampList:
- idxMax = np.max(idxs) + 1
- sampleBase = super().preprocesssamples(np.arange(idxMax))
- RPODBase = self.RPOD[: idxMax, idxs]
- return sampleBase.dot(RPODBase)
+ if self.samples_full is None or len(self.samples_full) == 0: return
+ return self.samples_full(idxs)
def postprocessu(self, u:sampList, overwrite : bool = False) -> Np1D:
- if self.verbosity >= 10:
- verbosityDepth("INIT", "Starting orthogonalization.",
- timestamp = self.timestamp)
ns = self.nsamples
- if ns == 0:
- u, r, _ = self.PODEngine.GS(u, np.empty((0, 0)))
- r = r[0]
- else:
- u, r, _ = self.PODEngine.GS(u, self.samples(np.arange(ns)), ns)
if overwrite:
- self.RPOD[: ns + 1, ns] = r
+ self.samples_full[ns] = copy(u)
else:
if ns == 0:
- self.RPOD = r.reshape((1, 1))
+ self.samples_full = sampleList(u)
else:
- self.RPOD=np.block([[ self.RPOD, r[:-1, None]],
- [np.zeros((1, ns)), r[-1]]])
- if self.verbosity >= 10:
- verbosityDepth("DEL", "Done orthogonalizing.",
- timestamp = self.timestamp)
+ self.samples_full.append(u)
return u
+ def lastSampleManagement(self):
+ self.samples, self.RPOD = self.PODEngine.QRHouseholder(
+ self.samples_full)
+
def preallocateSamples(self, u:Np1D, mu:paramVal, n:int):
super().preallocateSamples(u, mu, n)
- r = self.RPOD
- self.RPOD = np.zeros((n, n), dtype = u.dtype)
- self.RPOD[0, 0] = r[0, 0]
-
+ self.samples_full.reset((u.shape[0], n), u.dtype)
+ self.samples_full[0] = u
diff --git a/rrompy/sampling/linear_problem/sampling_engine_linear_pod.py b/rrompy/sampling/linear_problem/sampling_engine_linear_pod_naive.py
similarity index 84%
copy from rrompy/sampling/linear_problem/sampling_engine_linear_pod.py
copy to rrompy/sampling/linear_problem/sampling_engine_linear_pod_naive.py
index 2f4a100..83dddae 100644
--- a/rrompy/sampling/linear_problem/sampling_engine_linear_pod.py
+++ b/rrompy/sampling/linear_problem/sampling_engine_linear_pod_naive.py
@@ -1,83 +1,93 @@
# Copyright (C) 2018 by the RROMPy authors
#
# This file is part of RROMPy.
#
# RROMPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RROMPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with RROMPy. If not, see .
#
import numpy as np
from rrompy.sampling.base.pod_engine import PODEngine
from .sampling_engine_linear import SamplingEngineLinear
from rrompy.utilities.base.types import Np1D, paramVal, sampList
from rrompy.utilities.base import verbosityDepth
-__all__ = ['SamplingEngineLinearPOD']
+__all__ = ['SamplingEngineLinearPODNaive']
-class SamplingEngineLinearPOD(SamplingEngineLinear):
+class SamplingEngineLinearPODNaive(SamplingEngineLinear):
"""HERE"""
def resetHistory(self):
super().resetHistory()
self.RPOD = None
def popSample(self):
if hasattr(self, "nsamples") and self.nsamples > 1:
self.RPOD = self.RPOD[: -1, : -1]
super().popSample()
@property
def HFEngine(self):
"""Value of HFEngine. Its assignment resets history."""
return self._HFEngine
@HFEngine.setter
def HFEngine(self, HFEngine):
self._HFEngine = HFEngine
self.resetHistory()
self.PODEngine = PODEngine(self._HFEngine)
def preprocesssamples(self, idxs:Np1D) -> sampList:
idxMax = np.max(idxs) + 1
sampleBase = super().preprocesssamples(np.arange(idxMax))
RPODBase = self.RPOD[: idxMax, idxs]
return sampleBase.dot(RPODBase)
def postprocessu(self, u:sampList, overwrite : bool = False) -> Np1D:
if self.verbosity >= 10:
verbosityDepth("INIT", "Starting orthogonalization.",
timestamp = self.timestamp)
ns = self.nsamples
if ns == 0:
u, r, _ = self.PODEngine.GS(u, np.empty((0, 0)))
r = r[0]
else:
u, r, _ = self.PODEngine.GS(u, self.samples(np.arange(ns)), ns)
if overwrite:
self.RPOD[: ns + 1, ns] = r
else:
if ns == 0:
self.RPOD = r.reshape((1, 1))
else:
self.RPOD=np.block([[ self.RPOD, r[:-1, None]],
[np.zeros((1, ns)), r[-1]]])
if self.verbosity >= 10:
verbosityDepth("DEL", "Done orthogonalizing.",
timestamp = self.timestamp)
return u
+ def postprocessuBulk(self, u:sampList) -> sampList:
+ if self.verbosity >= 10:
+ verbosityDepth("INIT", "Starting orthogonalization.",
+ timestamp = self.timestamp)
+ u, self.RPOD = self.PODEngine.QRHouseholder(u)
+ if self.verbosity >= 10:
+ verbosityDepth("DEL", "Done orthogonalizing.",
+ timestamp = self.timestamp)
+ return u
+
def preallocateSamples(self, u:Np1D, mu:paramVal, n:int):
super().preallocateSamples(u, mu, n)
r = self.RPOD
self.RPOD = np.zeros((n, n), dtype = u.dtype)
self.RPOD[0, 0] = r[0, 0]