Page MenuHomec4science

ROMApproximantSweeper.py
No OneTemporary

File Metadata

Created
Sun, Sep 29, 06:57

ROMApproximantSweeper.py

#!/usr/bin/python
import os
import csv
import warnings
import numpy as np
from context import utilities
class ROMApproximantSweeper:
"""
ROM approximant sweeper.
Args:
ROMEngine(optional): ROMApproximant class. Defaults to None.
ktars(optional): Array of parameter values to sweep. Defaults to empty
array.
params(optional): List of parameter settings (each as a dict) to
explore. Defaults to single empty set.
mostExpensive(optional): String containing label of most expensive
step, to be executed fewer times. Allowed options are 'HF' and
'Approx'. Defaults to 'HF'.
Attributes:
ROMEngine: ROMApproximant class.
ktars: Array of parameter values to sweep.
params: List of parameter settings (each as a dict) to explore.
mostExpensive: String containing label of most expensive step, to be
executed fewer times.
"""
allowedOutputs = ["HFNorm", "AppNorm", "AppError"]
def __init__(self, ROMEngine : 'ROMApproximant' = None,
ktars : 'numpy 1D array' = np.array([]),
params : "List of dicts" = [{}],
mostExpensive : str = "HF"):
self.ROMEngine = ROMEngine
self.ktars = ktars
self.params = params
self.mostExpensive = mostExpensive
def name(self) -> str:
"""Approximant label."""
return self.__class__.__name__
@property
def mostExpensive(self):
"""Value of mostExpensive."""
return self._mostExpensive
@mostExpensive.setter
def mostExpensive(self, mostExpensive:str):
mostExpensive = mostExpensive.upper()
if mostExpensive not in ["HF", "APPROX"]:
warnings.warn(("Value of mostExpensive not recognized. Overriding "
"to 'HF'."))
mostExpensive = "HF"
self._mostExpensive = mostExpensive
def checkValues(self) -> bool:
"""Check if sweep can be performed."""
if self.ROMEngine is None:
warnings.warn("ROMEngine is missing. Aborting.")
return False
if len(self.ktars) == 0:
warnings.warn("Empty target parameter vector. Aborting.")
return False
if len(self.params) == 0:
warnings.warn("Empty method parameters vector. Aborting.")
return False
return True
def sweep(self, filename : str = "./out", outputs : list = [],
verbose : int = 1):
if not self.checkValues(): return
try:
if outputs.upper() == "ALL":
outputs = self.allowedOutputs + ["poles"]
except:
if len(outputs) == 0:
outputs = self.allowedOutputs
outputs = utilities.purgeList(outputs,
self.allowedOutputs + ["poles"],
listname = "outputs list")
poles = ("poles" in outputs)
if len(outputs) == 0:
warnings.warn("Empty outputs. Aborting.")
return
outParList = self.ROMEngine.parameterList()
Nparams = len(self.params)
allowedParams = self.ROMEngine.parameterList()
while os.path.exists(filename):
filename = filename + "{}".format(np.random.randint(10))
append_write = "w"
initial_row = (outParList + ["kRe", "kIm"]
+ [x for x in self.allowedOutputs if x in outputs]
+ ["type"] + ["poles"] * poles)
with open(filename, append_write, buffering = 1) as fout:
writer = csv.writer(fout, delimiter=",")
writer.writerow(initial_row)
if self.mostExpensive == "HF":
outerSet = self.ktars
innerSet = self.params
elif self.mostExpensive == "APPROX":
outerSet = self.params
innerSet = self.ktars
for outerIdx, outerPar in enumerate(outerSet):
if self.mostExpensive == "HF":
i, ktar = outerIdx, outerPar
elif self.mostExpensive == "APPROX":
j, par = outerIdx, outerPar
self.ROMEngine.approxParameters = {k: par[k] for k in\
par.keys() & allowedParams}
self.ROMEngine.setupApprox()
for innerIdx, innerPar in enumerate(innerSet):
if self.mostExpensive == "APPROX":
i, ktar = innerIdx, innerPar
elif self.mostExpensive == "HF":
j, par = innerIdx, innerPar
self.ROMEngine.approxParameters = {k: par[k] for k in\
par.keys() & allowedParams}
self.ROMEngine.setupApprox()
if verbose >= 1:
print("Set {}/{}\tk_{} = {:.10f}{}"\
.format(j+1, Nparams, i, ktar, " " * 15),
end="\r")
outData = []
if "HFNorm" in outputs:
outData = outData + [self.ROMEngine.HFNorm(ktar)]
if "AppNorm" in outputs:
outData = outData + [self.ROMEngine.approxNorm(ktar)]
if "AppError" in outputs:
outData = outData + [self.ROMEngine.approxError(ktar)]
writeData = []
for parn in outParList:
writeData = (writeData
+ [self.ROMEngine.approxParameters[parn]])
writeData = (writeData + [ktar.real, ktar.imag]
+ outData + [self.ROMEngine.name()])
if poles:
writeData = writeData + list(self.ROMEngine.getPoles())
writer.writerow(str(x) for x in writeData)
if verbose >= 1:
if self.mostExpensive == "APPROX":
print("Set {}/{}\tdone{}".format(j+1, Nparams, " "*25))
elif self.mostExpensive == "HF":
print("Point k_{} = {:.10f}\tdone{}".format(i, ktar,
" " * 25))
self.filename = filename
return self.filename
def read(self, filename:str, restrictions : dict = {},
outputs : list = []) -> dict:
"""
Execute a query on a custom format CSV.
Args:
filename: CSV filename.
restrictions(optional): Parameter configurations to output.
Defaults to empty dictionary, i.e. output all.
outputs(optional): Values to output. Defaults to empty list, i.e.
no output.
Returns:
Dictionary of desired results, with a key for each entry of
outputs, and a numpy 1D array as corresponding value.
"""
def pairKFromCSV(filename:str, kTarget:complex) -> "2-tuple of str":
"""
Find complex point in CSV closer to a prescribed value.
Args:
filename: CSV filename.
zTarget: Target complex value.
Returns:
Strings containing real and imaginary part of desired value, in
the same format as in the CSV file.
"""
ktarsF = np.array([], dtype = complex)
kRetarsF = np.array([], dtype = complex)
kImtarsF = np.array([], dtype = complex)
with open(filename, 'r') as f:
reader = csv.reader(f, delimiter=',')
header = next(reader)
kReindex = header.index('kRe')
kImindex = header.index('kIm')
for row in reader:
try:
if row[kReindex] not in [" ", ""]:
kRetarsF = np.append(kRetarsF, row[kReindex])
kImtarsF = np.append(kImtarsF, row[kImindex])
ktarsF = np.append(ktarsF, float(row[kReindex])
+ 1.j * float(row[kImindex]))
except:
pass
optimalIndex = np.argmin(np.abs(ktarsF - kTarget))
return [kRetarsF[optimalIndex], kImtarsF[optimalIndex]]
with open(filename, 'r') as f:
reader = csv.reader(f, delimiter=',')
header = next(reader)
restrIndices, outputIndices, outputData = {}, {}, {}
for key in restrictions.keys():
try:
restrIndices[key] = header.index(key)
if not isinstance(restrictions[key], list):
restrictions[key] = [restrictions[key]]
restrictions[key] = [str(x) for x in restrictions[key]]
except:
warnings.warn("Ignoring key {} from restrictions".format(
key))
if 'kRe' in restrIndices.keys() or 'kIm' in restrIndices.keys():
if 'kRe' not in restrIndices.keys():
restrIndices['kRe'] = header.index('kRe')
restrictions['kRe'] = [0.] * len(restrictions['kIm'])
elif 'kIm' not in restrIndices.keys():
restrIndices['kIm'] = header.index('kIm')
restrictions['kIm'] = [0.] * len(restrictions['kRe'])
elif len(restrictions['kRe']) != len(restrictions['kIm']):
raise Exception(("The lists of values for kRe and kIm "
"must have the same length."))
for i in range(len(restrictions['kRe'])):
k = (1.0 * float(restrictions['kRe'][i])
+ 1.j * float(restrictions['kIm'][i]))
restrictions['kRe'][i], restrictions['kIm'][i] =\
pairKFromCSV(filename, k)
for key in outputs:
try:
outputIndices[key] = header.index(key)
outputData[key] = np.array([])
except:
warnings.warn("Ignoring key {} from outputs".format(key))
for row in reader:
if all([row[restrIndices[key]] in restrictions[key]\
for key in restrictions.keys()]):
for key in outputIndices.keys():
try:
val = row[outputIndices[key]]
val = int(val)
except:
try:
val = float(val)
except:
val = np.nan
finally:
outputData[key] = np.append(outputData[key], val)
return outputData

Event Timeline