diff --git a/PySONIC/core/batches.py b/PySONIC/core/batches.py index 124d4f7..d430604 100644 --- a/PySONIC/core/batches.py +++ b/PySONIC/core/batches.py @@ -1,366 +1,366 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2017-08-22 14:33:04 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2021-05-26 18:52:43 +# @Last Modified time: 2021-06-07 17:27:56 ''' Utility functions used in simulations ''' import os import abc import csv import logging import numpy as np import pandas as pd import multiprocess as mp -from ..utils import logger, si_format, isIterable, rangecode +from ..utils import logger, isIterable, rangecode class Consumer(mp.Process): ''' Generic consumer process, taking tasks from a queue and outputing results in another queue. ''' def __init__(self, queue_in, queue_out): mp.Process.__init__(self) self.queue_in = queue_in self.queue_out = queue_out logger.debug('Starting %s', self.name) def run(self): while True: nextTask = self.queue_in.get() if nextTask is None: logger.debug('Exiting %s', self.name) self.queue_in.task_done() break answer = nextTask() self.queue_in.task_done() self.queue_out.put(answer) return class Worker: ''' Generic worker class calling a specific function with a given set of parameters. ''' def __init__(self, wid, func, args, kwargs, loglevel): ''' Worker constructor. :param wid: worker ID :param func: function object :param args: list of method arguments :param kwargs: dictionary of optional method arguments :param loglevel: logging level ''' self.id = wid self.func = func self.args = args self.kwargs = kwargs self.loglevel = loglevel def __call__(self): ''' Caller to the function with specific parameters. ''' logger.setLevel(self.loglevel) return self.id, self.func(*self.args, **self.kwargs) class Batch: ''' Generic interface to run batches of function calls. ''' def __init__(self, func, queue): ''' Batch constructor. :param func: function object :param queue: list of list of function parameters ''' self.func = func self.queue = queue def __call__(self, *args, **kwargs): ''' Call the internal run method. ''' return self.run(*args, **kwargs) def getNConsumers(self): ''' Determine number of consumers based on queue length and number of available CPUs. ''' return min(mp.cpu_count(), len(self.queue)) def start(self): ''' Create tasks and results queues, and start consumers. ''' mp.freeze_support() self.tasks = mp.JoinableQueue() self.results = mp.Queue() self.consumers = [Consumer(self.tasks, self.results) for i in range(self.getNConsumers())] for c in self.consumers: c.start() @staticmethod def resolve(params): if isinstance(params, list): args = params kwargs = {} elif isinstance(params, tuple): args, kwargs = params return args, kwargs def assign(self, loglevel): ''' Assign tasks to workers. ''' for i, params in enumerate(self.queue): args, kwargs = self.resolve(params) worker = Worker(i, self.func, args, kwargs, loglevel) self.tasks.put(worker, block=False) def join(self): ''' Put all tasks to None and join the queue. ''' for i in range(len(self.consumers)): self.tasks.put(None, block=False) self.tasks.join() def get(self): ''' Extract and re-order results. ''' outputs, idxs = [], [] for i in range(len(self.queue)): wid, out = self.results.get() outputs.append(out) idxs.append(wid) return [x for _, x in sorted(zip(idxs, outputs))] def stop(self): ''' Close tasks and results queues. ''' self.tasks.close() self.results.close() def run(self, mpi=False, loglevel=logging.INFO): ''' Run batch with or without multiprocessing. ''' if mpi: self.start() self.assign(loglevel) self.join() outputs = self.get() self.stop() else: outputs = [] for params in self.queue: args, kwargs = self.resolve(params) outputs.append(self.func(*args, **kwargs)) return outputs @staticmethod def createQueue(*dims): ''' Create a serialized 2D array of all parameter combinations for a series of individual parameter sweeps. :param dims: list of lists (or 1D arrays) of input parameters :return: list of parameters (list) for each simulation ''' ndims = len(dims) dims_in = [dims[1], dims[0]] inds_out = [1, 0] if ndims > 2: dims_in += dims[2:] inds_out += list(range(2, ndims)) queue = np.stack(np.meshgrid(*dims_in), -1).reshape(-1, ndims) queue = queue[:, inds_out] return queue.tolist() @staticmethod def printQueue(queue, nmax=20): if len(queue) <= nmax: for x in queue: print(x) else: for x in queue[:nmax // 2]: print(x) print(f'... {len(queue) - nmax} more entries ...') for x in queue[-nmax // 2:]: print(x) class LogBatch(metaclass=abc.ABCMeta): ''' Generic interface to a simulation batch in with real-time input:output caching in a specific log file. ''' delimiter = '\t' # csv delimiter rtol = 1e-9 atol = 1e-16 def __init__(self, inputs, root='.'): ''' Construtor. :param inputs: array of batch inputs :param root: root for IO operations ''' self.inputs = inputs self.root = root self.fpath = self.filepath() @property def root(self): return self._root @root.setter def root(self, value): if not os.path.isdir(value): raise ValueError(f'{value} is not a valid directory') self._root = value @property @abc.abstractmethod def in_key(self): ''' Input key. ''' raise NotImplementedError @property @abc.abstractmethod def out_keys(self): ''' Output keys. ''' raise NotImplementedError @property @abc.abstractmethod def suffix(self): ''' filename suffix ''' raise NotImplementedError @property @abc.abstractmethod def unit(self): ''' Input unit. ''' raise NotImplementedError @property def in_label(self): ''' Input label. ''' return f'{self.in_key} ({self.unit})' @property def inputscode(self): ''' String describing the batch inputs. ''' return rangecode(self.inputs, self.in_key, self.unit) @abc.abstractmethod def corecode(self): ''' String describing the batch core components. ''' raise NotImplementedError def filecode(self): ''' String fully describing the batch. ''' return f'{self.corecode()}_{self.inputscode}_{self.suffix}_results' def filename(self): ''' Batch associated filename. ''' return f'{self.filecode()}.csv' def filepath(self): ''' Batch associated filepath. ''' return os.path.join(self.root, self.filename()) def isFinished(self): if not os.path.isfile(self.fpath): return False if len(self.getSerializedOutput()) != len(self.inputs): return False return True def createLogFile(self): ''' Create batch log file if it does not exist. ''' if not os.path.isfile(self.fpath): logger.debug(f'creating batch log file: "{self.fpath}"') self.writeLabels() else: logger.debug(f'existing batch log file: "{self.fpath}"') def writeLabels(self): ''' Write the column labels of the batch log file. ''' with open(self.fpath, 'w') as csvfile: writer = csv.writer(csvfile, delimiter=self.delimiter) writer.writerow([self.in_label, *self.out_keys]) def writeEntry(self, entry): ''' Write a new input(s):ouput(s) entry in the batch log file. ''' with open(self.fpath, 'a', newline='') as csvfile: writer = csv.writer(csvfile, delimiter=self.delimiter) writer.writerow(entry) def getLogData(self): ''' Retrieve the batch log file data (inputs and outputs) as a dataframe. ''' return pd.read_csv(self.fpath, sep=self.delimiter).sort_values(self.in_label) def getInput(self): ''' Retrieve the logged batch inputs as an array. ''' return self.getLogData()[self.in_label].values def getSerializedOutput(self): ''' Retrieve the logged batch outputs as an array (if 1 key) or dataframe (if several). ''' if len(self.out_keys) == 1: return self.getLogData()[self.out_keys[0]].values else: return pd.DataFrame({k: self.getLogData()[k].values for k in self.out_keys}) def getOutput(self): return self.getSerializedOutput() def getEntryIndex(self, entry): ''' Get the index corresponding to a given entry. ''' inputs = self.getInput() if len(inputs) == 0: raise ValueError(f'no entries in batch') close = np.isclose(inputs, entry, rtol=self.rtol, atol=self.atol) imatches = np.where(close)[0] if len(imatches) == 0: raise ValueError(f'{entry} entry not found in batch log') elif len(imatches) > 1: raise ValueError(f'duplicate {entry} entry found in batch log') return imatches[0] def getEntryOutput(self, entry): imatch = self.getEntryIndex(entry) return self.getSerializedOutput()[imatch] def isEntry(self, value): ''' Check if a given input is logged in the batch log file. ''' try: self.getEntryIndex(value) return True except ValueError: return False @abc.abstractmethod def compute(self, x): ''' Compute the necessary output(s) for a given input. ''' raise NotImplementedError def computeAndLog(self, x): ''' Compute output(s) and log new entry only if input is not already in the log file. ''' if not self.isEntry(x): logger.debug(f'entry not found: "{x}"') out = self.compute(x) if not isIterable(x): x = [x] if not isIterable(out): out = [out] entry = [*x, *out] if not self.mpi: self.writeEntry(entry) return entry else: logger.debug(f'existing entry: "{x}"') return None def run(self, mpi=False): ''' Run the batch and return the output(s). ''' self.createLogFile() if len(self.getLogData()) < len(self.inputs): batch = Batch(self.computeAndLog, [[x] for x in self.inputs]) self.mpi = mpi outputs = batch.run(mpi=mpi, loglevel=logger.level) outputs = filter(lambda x: x is not None, outputs) if mpi: for out in outputs: self.writeEntry(out) self.mpi = False else: logger.debug('all entries already present') return self.getOutput() diff --git a/PySONIC/core/drives.py b/PySONIC/core/drives.py index 07dd6b5..4cb6674 100644 --- a/PySONIC/core/drives.py +++ b/PySONIC/core/drives.py @@ -1,434 +1,440 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2020-01-30 11:46:47 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2021-05-25 14:29:03 +# @Last Modified time: 2021-06-07 23:52:02 import abc import numpy as np from .stimobj import StimObject from ..constants import * from .batches import Batch class Drive(StimObject): ''' Generic interface to drive object. ''' @abc.abstractmethod def compute(self, t): ''' Compute the input drive at a specific time. :param t: time (s) :return: specific input drive ''' raise NotImplementedError @classmethod def createQueue(cls, *args): ''' Create a list of Drive objects for combinations of input parameters. ''' if len(args) == 1: return [cls(item) for item in args[0]] else: return [cls(*item) for item in Batch.createQueue(*args)] @property def is_searchable(self): return False class XDrive(Drive): ''' Drive object that can be titrated to find the threshold value of one of its inputs. ''' xvar_initial = None xvar_rel_thr = None xvar_thr = None xvar_precheck = False @property @abc.abstractmethod def xvar(self): raise NotImplementedError @xvar.setter @abc.abstractmethod def xvar(self, value): raise NotImplementedError def updatedX(self, value): other = self.copy() other.xvar = value return other @property def is_searchable(self): return True @property def is_resolved(self): return self.xvar is not None def nullCopy(self): return self.copy().updatedX(0.) class ElectricDrive(XDrive): ''' Electric drive object with constant amplitude. ''' xkey = 'I' xvar_initial = ESTIM_AMP_INITIAL xvar_rel_thr = ESTIM_REL_CONV_THR xvar_range = (0., ESTIM_AMP_UPPER_BOUND) def __init__(self, I): ''' Constructor. :param I: current density (mA/m2) ''' self.I = I @property def I(self): return self._I @I.setter def I(self, value): if value is not None: value = self.checkFloat('I', value) self._I = value @property def xvar(self): return self.I @xvar.setter def xvar(self, value): self.I = value def copy(self): return self.__class__(self.I) @staticmethod def inputs(): return { 'I': { 'desc': 'current density amplitude', 'label': 'I', 'unit': 'A/m2', 'factor': 1e-3, 'precision': 1 } } def compute(self, t): return self.I class VoltageDrive(Drive): ''' Voltage drive object with a held potential and a step potential. ''' def __init__(self, Vhold, Vstep): ''' Constructor. :param Vhold: held voltage (mV) :param Vstep: step voltage (mV) ''' self.Vhold = Vhold self.Vstep = Vstep @property def Vhold(self): return self._Vhold @Vhold.setter def Vhold(self, value): value = self.checkFloat('Vhold', value) self._Vhold = value @property def Vstep(self): return self._Vstep @Vstep.setter def Vstep(self, value): value = self.checkFloat('Vstep', value) self._Vstep = value def copy(self): return self.__class__(self.Vhold, self.Vstep) @staticmethod def inputs(): return { 'Vhold': { 'desc': 'held voltage', 'label': 'V_{hold}', 'unit': 'V', 'precision': 0, 'factor': 1e-3 }, 'Vstep': { 'desc': 'step voltage', 'label': 'V_{step}', 'unit': 'V', 'precision': 0, 'factor': 1e-3 } } @property def filecodes(self): return { 'Vhold': f'{self.Vhold:.1f}mV', 'Vstep': f'{self.Vstep:.1f}mV', } def compute(self, t): return self.Vstep class AcousticDrive(XDrive): ''' Acoustic drive object with intrinsic frequency and amplitude. ''' xkey = 'A' xvar_initial = ASTIM_AMP_INITIAL xvar_rel_thr = ASTIM_REL_CONV_THR xvar_thr = ASTIM_ABS_CONV_THR xvar_precheck = True def __init__(self, f, A=None, phi=np.pi): ''' Constructor. :param f: carrier frequency (Hz) :param A: peak pressure amplitude (Pa) :param phi: phase (rad) ''' self.f = f self.A = A self.phi = phi @property def f(self): return self._f @f.setter def f(self, value): value = self.checkFloat('f', value) self.checkStrictlyPositive('f', value) self._f = value @property def A(self): return self._A @A.setter def A(self, value): if value is not None: value = self.checkFloat('A', value) self.checkPositiveOrNull('A', value) self._A = value @property def phi(self): return self._phi @phi.setter def phi(self, value): value = self.checkFloat('phi', value) self._phi = value def pdict(self, **kwargs): d = super().pdict(**kwargs) if self.phi == np.pi: del d['phi'] return d @property def xvar(self): return self.A @xvar.setter def xvar(self, value): self.A = value def copy(self): return self.__class__(self.f, self.A, phi=self.phi) @staticmethod def inputs(): return { 'f': { 'desc': 'US drive frequency', 'label': 'f', 'unit': 'Hz', 'precision': 0 }, 'A': { 'desc': 'US pressure amplitude', 'label': 'A', 'unit': 'Pa', 'precision': 2 }, 'phi': { 'desc': 'US drive phase', 'label': '\Phi', 'unit': 'rad', 'precision': 2 } } @property def dt(self): ''' Determine integration time step. ''' return 1 / (NPC_DENSE * self.f) @property def dt_sparse(self): return 1 / (NPC_SPARSE * self.f) @property def periodicity(self): ''' Determine drive periodicity. ''' return 1. / self.f @property def nPerCycle(self): return NPC_DENSE @property def modulationFrequency(self): return self.f def compute(self, t): return self.A * np.sin(2 * np.pi * self.f * t - self.phi) class DriveIterator: def __init__(self, drives): self._drives = drives self._index = 0 def __next__(self): if self._index < len(self._drives): result = self._drives[self._index] self._index += 1 return result raise StopIteration class DriveArray(Drive): def __init__(self, drives): self.drives = {f'drive {i + 1}': s for i, s in enumerate(drives)} def __eq__(self, other): if not isinstance(other, self.__class__): return False if self.ndrives != other.ndrives: return False if list(self.drives.keys()) != list(other.drives.keys()): return False for k, v in self.drives.items(): if other.drives[k] != v: return False return True def toArray(self, dlist, skey='=', jkey=', ', wraplist=True): # Arrange parameters as key: list of values dictionary (with right-hand sides of split key) - d = {k: [x[k].split(skey)[1] for x in dlist] for k in dlist[0].keys()} + d = {} + for k in dlist[0].keys(): + if k == 'phi': + d[k] = [x.get(k, f'phi{skey}3.14rad').split(skey)[1] for x in dlist] + else: + d[k] = [x[k].split(skey)[1] for x in dlist] + # d = {k: [x[k].split(skey)[1] for x in dlist] for k in dlist[0].keys()} # Discard duplicates in each list (while retaining element order) d = {k: [v[i] for i in sorted(np.unique(v, return_index=True)[1])] for k, v in d.items()} # Format each list element as a string dstr = {k: jkey.join(v) for k, v in d.items()} # Wrap multi-values elements if specified if wraplist: dstr = {k: f'[{v}]' if len(d[k]) > 1 else v for k, v in dstr.items()} # Re-add splitkey formulation and return dictionary return {k: f"{k}{skey}{v}" for k, v in dstr.items()} def __repr__(self): pdict = self.toArray([x.pdict() for x in self.drives.values()], skey='=') return f'{self.__class__.__name__}({", ".join(pdict.values())})' def __getitem__(self, i): return list(self.drives.values())[i] def __len__(self): return len(self.drives) def __iter__(self): return DriveIterator(list(self.drives.values())) @staticmethod def inputs(): return self.drives.values()[0].inputs() def copy(self): return self.__class__([x.copy() for x in self.drives.values()]) @property def ndrives(self): return len(self.drives) @property def meta(self): return {k: s.meta for k, s in self.drives.items()} @property def desc(self): pdict = self.toArray([x.pdict() for x in self.drives.values()], skey='=') return ', '.join(pdict.values()) @property def filecodes(self): return self.toArray( [x.filecodes for x in self.drives.values()], skey='_', jkey='_', wraplist=False) def compute(self, t): return sum(s.compute(t) for s in self.drives.values()) def updatedX(self, value): return self.__class__([d.updatedX(value) for d in self.drives.values()]) def nullCopy(self): return self.copy().updatedX(0.) class AcousticDriveArray(DriveArray): def is_monofrequency(self): return all(x.f == self[0].f for x in self) @property def fmax(self): return max(s.f for s in self.drives.values()) @property def fmin(self): return min(s.f for s in self.drives.values()) @property def dt(self): return 1 / (NPC_DENSE * self.fmax) @property def dt_sparse(self): return 1 / (NPC_SPARSE * self.fmax) @property def periodicity(self): if self.is_monofrequency(): return self[0].periodicity # s if self.ndrives > 2: raise ValueError('cannot compute periodicity for more than two drives') return 1 / (self.fmax - self.fmin) @property def nPerCycle(self): return int(self.periodicity // self.dt) @property def modulationFrequency(self): return np.mean([s.f for s in self.drives.values()])